From 9ce6faf093caf83d99c0b1049962452e84fcee4f Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 25 Jun 2024 13:49:36 +0100 Subject: [PATCH] Speed up `LoadMode.DBT_LS` by caching dbt ls output in Airflow Variable (#1014) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Improve significantly the `LoadMode.DBT_LS` performance. The example DAGs tested reduced the task queueing time significantly (from ~30s to ~0.5s) and the total DAG run time for Jaffle Shop from 1 min 25s to 40s (by more than 50%). Some users[ reported improvements of 84%](https://github.com/astronomer/astronomer-cosmos/pull/1014#issuecomment-2168185343) in the DAG run time when trying out these changes. This difference can be even more significant on larger dbt projects. The improvement was accomplished by caching the dbt ls output as an Airflow Variable. This is an alternative to #992, when we cached the pickled DAG/TaskGroup into a local file in the Airflow node. Unlike #992, this approach works well for distributed deployments of Airflow. As with any caching solution, this strategy does not guarantee optimal performance on every run—whenever the cache is regenerated, the scheduler or DAG processor will experience a delay. It was also observed that the key value could change across platforms (e.g., `Darwin` and `Linux`). Therefore, if using a deployment with heterogeneous OS, the key may be regenerated often. Closes: #990 Closes: #1061 **Enabling/disabling this feature** This feature is enabled by default. Users can disable it by setting the environment variable `AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS=0`. **How the cache is refreshed** Users can purge or delete the cache via Airflow UI by identifying and deleting the cache key. The cache will be automatically refreshed in case any files of the dbt project change. Changes are calculated using the SHA256 of all the files in the directory. Initially, this feature was implemented using the files' modified timestamp, but this did not work well for some Airflow deployments (e.g., `astro --dags` since the timestamp was changed during deployments). Additionally, if any of the following DAG configurations are changed, we'll automatically purge the cache of the DAGs that use that specific configuration: * `ProjectConfig.dbt_vars` * `ProjectConfig.env_vars` * `ProjectConfig.partial_parse` * `RenderConfig.env_vars` * `RenderConfig.exclude` * `RenderConfig.select` * `RenderConfig.selector` The following argument was introduced in case users would like to define Airflow variables that could be used to refresh the cache (it expects a list with Airflow variable names): * `RenderConfig.airflow_vars_to_purge_cache` Example: ``` RenderConfig( airflow_vars_to_purge_cache==["refresh_cache"] ) ``` **Cache key** The Airflow variables that represent the dbt ls cache are prefixed by `cosmos_cache`. When using `DbtDag`, the keys use the DAG name. When using `DbtTaskGroup`, they consider the TaskGroup and parent task groups and DAG. Examples: 1. The `DbtDag` "cosmos_dag" will have the cache represented by `"cosmos_cache__basic_cosmos_dag"`. 2. The `DbtTaskGroup` "customers" declared inside teh DAG "basic_cosmos_task_group" will have the cache key `"cosmos_cache__basic_cosmos_task_group__customers"`. **Cache value** The cache values contain a few properties: - `last_modified` timestamp, represented using the ISO 8601 format. - `version` is a hash that represents the version of the dbt project and arguments used to run dbt ls by the time the cache was created - `dbt_ls_compressed` represents the dbt ls output compressed using zlib and encoded to base64 to be recorded as a string to the Airflow metadata database. Steps used to compress: ``` compressed_data = zlib.compress(dbt_ls_output.encode("utf-8")) encoded_data = base64.b64encode(compressed_data) dbt_ls_compressed = encoded_data.decode("utf-8") ``` We are compressing this value because it will be significant for larger dbt projects, depending on the selectors used, and we wanted this approach to be safe and not clutter the Airflow metadata database. Some numbers on the compression * A dbt project with 100 models can lead to a dbt ls output of 257k characters when using JSON. Zlib could compress it by 20x. * Another [real-life dbt project](https://gitlab.com/gitlab-data/analytics/-/tree/master/transform/snowflake-dbt?ref_type=heads) with 9,285 models led to a dbt ls output of 8.4 MB, uncompressed. It reduces to 489 KB after being compressed using `zlib` and encoded using `base64` - to 6% of the original size. * Maximum cell size in Postgres: 20MB The latency used to compress is in the order of milliseconds, not interfering in the performance of this solution. **Future work** * How this will affect the Airflow db in the long term * How does this performance compare to `ObjectStorage`? **Example of results before and after this change** Task queue times in Astro before the change: Screenshot 2024-06-03 at 11 15 26 Task queue times in Astro after the change on the second run of the DAG: Screenshot 2024-06-03 at 11 15 44 This feature will be available in `astronomer-cosmos==1.5.0a8`. --- .github/workflows/test.yml | 2 + .pre-commit-config.yaml | 1 + CHANGELOG.rst | 77 +++++++- cosmos/__init__.py | 2 +- cosmos/cache.py | 193 ++++++++++++++++++- cosmos/config.py | 1 + cosmos/constants.py | 1 + cosmos/converter.py | 23 ++- cosmos/dbt/graph.py | 225 ++++++++++++++++++---- cosmos/settings.py | 2 + dev/dags/basic_cosmos_task_group.py | 9 +- dev/dags/example_cosmos_cleanup_dag.py | 34 ++++ docs/configuration/caching.rst | 118 ++++++++++++ docs/configuration/cosmos-conf.rst | 16 ++ docs/configuration/index.rst | 1 + docs/configuration/parsing-methods.rst | 3 + docs/configuration/render-config.rst | 1 + pyproject.toml | 3 +- scripts/test/integration-setup.sh | 14 +- scripts/test/integration.sh | 12 ++ tests/dbt/test_graph.py | 247 ++++++++++++++++++++++++- tests/test_cache.py | 72 ++++++- 22 files changed, 994 insertions(+), 63 deletions(-) create mode 100644 dev/dags/example_cosmos_cleanup_dag.py create mode 100644 docs/configuration/caching.rst diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index afc32e702..96f0d5564 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -116,6 +116,7 @@ jobs: - uses: actions/checkout@v3 with: ref: ${{ github.event.pull_request.head.sha || github.ref }} + - uses: actions/cache@v3 with: path: | @@ -139,6 +140,7 @@ jobs: hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-setup hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration env: + AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0 AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres DATABRICKS_HOST: mock diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 86bfdbc74..a95bc2bdf 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -82,6 +82,7 @@ repos: types-PyYAML, types-attrs, attrs, + types-pytz, types-requests, types-python-dateutil, apache-airflow, diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 7757d5fb5..065209c99 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,8 +1,83 @@ Changelog ========= +1.5.0a9 (2024-06-25) +-------------------- + +New Features + +* Speed up ``LoadMode.DBT_LS`` by caching dbt ls output in Airflow Variable by @tatiana in #1014 +* Support for running dbt tasks in AWS EKS in #944 by @VolkerSchiewe +* Add Clickhouse profile mapping by @roadan and @pankajastro in #353 and #1016 +* Add node config to TaskInstance Context by @linchun3 in #1044 + +Bug fixes + +* Fix disk permission error in restricted env by @pankajastro in #1051 +* Add CSP header to iframe contents by @dwreeves in #1055 +* Stop attaching log adaptors to root logger to reduce logging costs by @glebkrapivin in #1047 + +Enhancements + +* Support ``static_index.html`` docs by @dwreeves in #999 +* Support deep linking dbt docs via Airflow UI by @dwreeves in #1038 +* Add ability to specify host/port for Snowflake connection by @whummer in #1063 + +Others + +* Update documentation for DbtDocs generator by @arjunanan6 in #1043 +* Use uv in CI by @dwreeves in #1013 +* Cache hatch folder in the CI by @tatiana in #1056 +* Change example DAGs to use ``example_conn`` as opposed to ``airflow_db`` by @tatiana in #1054 +* Mark plugin integration tests as integration by @tatiana in #1057 +* Ensure compliance with linting rule D300 by using triple quotes for docstrings by @pankajastro in #1049 +* Pre-commit hook updates in #1039, #1050, #1064 + + 1.4.3 (2024-06-07) ------------------ +------------------ + +Bug fixes + +* Bring back ``dataset`` as a required field for BigQuery profile by @pankajkoti in #1033 + +Enhancements + +* Only run ``dbt deps`` when there are dependencies by @tatiana and @AlgirdasDubickas in #1030 + +Docs + +* Fix docs so it does not reference non-existing ``get_dbt_dataset`` by @tatiana in #1034 + + +v1.4.2 (2024-06-06) +------------------- + +Bug fixes + +* Fix the invocation mode for ``ExecutionMode.VIRTUALENV`` by @marco9663 in #1023 +* Fix Cosmos ``enable_cache`` setting by @tatiana in #1025 +* Make ``GoogleCloudServiceAccountDictProfileMapping`` dataset profile arg optional by @oliverrmaa and @pankajastro in #839 and #1017 +* Athena profile mapping set ``aws_session_token`` in profile only if it exists by @pankajastro in #1022 + +Others + +* Update dbt and Airflow conflicts matrix by @tatiana in #1026 +* Enable Python 3.12 unittest by @pankajastro in #1018 +* Improve error logging in ``DbtLocalBaseOperator`` by @davidsteinar in #1004 +* Add GitHub issue templates for bug reports and feature request by @pankajkoti in #1009 +* Add more fields in bug template to reduce turnaround in issue triaging by @pankajkoti in #1027 +* Fix ``dev/Dockerfile`` + Add ``uv pip install`` for faster build time by @dwreeves in #997 +* Drop support for Airflow 2.3 by @pankajkoti in #994 +* Update Astro Runtime image by @RNHTTR in #988 and #989 +* Enable ruff F linting by @pankajastro in #985 +* Move Cosmos Airflow configuration to settings.py by @pankajastro in #975 +* Fix CI Issues by @tatiana in #1005 +* Pre-commit hook updates in #1000, #1019 + + +1.4.1 (2024-05-17) +------------------ Bug fixes diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 555f97e06..ee860228a 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -5,7 +5,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.4.3" +__version__ = "1.5.0a9" from cosmos.airflow.dag import DbtDag diff --git a/cosmos/cache.py b/cosmos/cache.py index b101366a0..fd1dd53f4 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -1,11 +1,22 @@ from __future__ import annotations +import functools +import hashlib +import json +import os import shutil +import time +from collections import defaultdict +from datetime import datetime, timedelta, timezone from pathlib import Path import msgpack +from airflow.models import DagRun, Variable from airflow.models.dag import DAG +from airflow.utils.session import provide_session from airflow.utils.task_group import TaskGroup +from sqlalchemy import select +from sqlalchemy.orm import Session from cosmos import settings from cosmos.constants import DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME @@ -13,6 +24,24 @@ from cosmos.log import get_logger logger = get_logger(__name__) +VAR_KEY_CACHE_PREFIX = "cosmos_cache__" + + +def _get_airflow_metadata(dag: DAG, task_group: TaskGroup | None) -> dict[str, str | None]: + dag_id = None + task_group_id = None + cosmos_type = "DbtDag" + + if task_group: + if task_group.dag_id is not None: + dag_id = task_group.dag_id + if task_group.group_id is not None: + task_group_id = task_group.group_id + cosmos_type = "DbtTaskGroup" + else: + dag_id = dag.dag_id + + return {"cosmos_type": cosmos_type, "dag_id": dag_id, "task_group_id": task_group_id} # It was considered to create a cache identifier based on the dbt project path, as opposed @@ -28,16 +57,21 @@ def _create_cache_identifier(dag: DAG, task_group: TaskGroup | None) -> str: :param task_group_name: (optional) Name of the Cosmos DbtTaskGroup being cached :return: Unique identifier representing the cache """ - if task_group: - if task_group.dag_id is not None: - cache_identifiers_list = [task_group.dag_id] - if task_group.group_id is not None: - cache_identifiers_list.extend([task_group.group_id.replace(".", "__")]) - cache_identifier = "__".join(cache_identifiers_list) - else: - cache_identifier = dag.dag_id + metadata = _get_airflow_metadata(dag, task_group) + cache_identifiers_list = [] + dag_id = metadata.get("dag_id") + task_group_id = metadata.get("task_group_id") + + if dag_id: + cache_identifiers_list.append(dag_id) + if task_group_id: + cache_identifiers_list.append(task_group_id.replace(".", "__")) - return cache_identifier + return "__".join(cache_identifiers_list) + + +def create_cache_key(cache_identifier: str) -> str: + return f"{VAR_KEY_CACHE_PREFIX}{cache_identifier}" def _obtain_cache_dir_path(cache_identifier: str, base_dir: Path = settings.cache_dir) -> Path: @@ -171,3 +205,144 @@ def _copy_partial_parse_to_project(partial_parse_filepath: Path, project_path: P if source_manifest_filepath.exists(): shutil.copy(str(source_manifest_filepath), str(target_manifest_filepath)) + + +def _create_folder_version_hash(dir_path: Path) -> str: + """ + Given a directory, iterate through its content and create a hash that will change in case the + contents of the directory change. The value should not change if the values of the directory do not change, even if + the command is run from different Airflow instances. + + This method output must be concise and it currently changes based on operating system. + """ + # This approach is less efficient than using modified time + # sum([path.stat().st_mtime for path in dir_path.glob("**/*")]) + # unfortunately, the modified time approach does not work well for dag-only deployments + # where DAGs are constantly synced to the deployed Airflow + # for 5k files, this seems to take 0.14 + hasher = hashlib.md5() + filepaths = [] + + for root_dir, dirs, files in os.walk(dir_path): + paths = [os.path.join(root_dir, filepath) for filepath in files] + filepaths.extend(paths) + + for filepath in sorted(filepaths): + with open(str(filepath), "rb") as fp: + buf = fp.read() + hasher.update(buf) + + return hasher.hexdigest() + + +def _calculate_dbt_ls_cache_current_version(cache_identifier: str, project_dir: Path, cmd_args: list[str]) -> str: + """ + Taking into account the project directory contents and the command arguments, calculate the + hash that represents the "dbt ls" command version - to be used to decide if the cache should be refreshed or not. + + :param cache_identifier: Unique identifier of the cache (may include DbtDag or DbtTaskGroup information) + :param project_path: Path to the target dbt project directory + :param cmd_args: List containing the arguments passed to the dbt ls command that would affect its output + """ + start_time = time.perf_counter() + + # Combined value for when the dbt project directory files were last modified + # This is fast (e.g. 0.01s for jaffle shop, 0.135s for a 5k models dbt folder) + dbt_project_hash = _create_folder_version_hash(project_dir) + + # The performance for the following will depend on the user's configuration + hash_args = hashlib.md5("".join(cmd_args).encode()).hexdigest() + + elapsed_time = time.perf_counter() - start_time + logger.info( + f"Cosmos performance: time to calculate cache identifier {cache_identifier} for current version: {elapsed_time}" + ) + return f"{dbt_project_hash},{hash_args}" + + +@functools.lru_cache +def was_project_modified(previous_version: str, current_version: str) -> bool: + """ + Given the cache version of a project and the latest version of the project, + decides if the project was modified or not. + """ + return previous_version != current_version + + +@provide_session +def delete_unused_dbt_ls_cache( + max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None +) -> int: + """ + Delete Cosmos cache stored in Airflow Variables based on the last execution of their associated DAGs. + + Example usage: + + There are three Cosmos cache Airflow Variables: + 1. ``cache cosmos_cache__basic_cosmos_dag`` + 2. ``cosmos_cache__basic_cosmos_task_group__orders`` + 3. ``cosmos_cache__basic_cosmos_task_group__customers`` + + The first relates to the ``DbtDag`` ``basic_cosmos_dag`` and the two last ones relate to the DAG + ``basic_cosmos_task_group`` that has two ``DbtTaskGroups``: ``orders`` and ``customers``. + + Let's assume the last DAG run of ``basic_cosmos_dag`` was a week ago and the last DAG run of + ``basic_cosmos_task_group`` was an hour ago. + + To delete the cache related to ``DbtDags`` and ``DbtTaskGroup`` that were run more than 5 days ago: + + ..code: python + >>> delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(days=5)) + INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_dag + + To delete the cache related to ``DbtDags`` and ``DbtTaskGroup`` that were run more than 10 minutes ago: + + ..code: python + >>> delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(minutes=10)) + INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_dag + INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_task_group__orders + INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_task_group__orders + + To delete the cache related to ``DbtDags`` and ``DbtTaskGroup`` that were run more than 10 days ago + + ..code: python + >>> delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(days=10)) + + In this last example, nothing is deleted. + """ + if session is None: + return 0 + + logger.info(f"Delete the Cosmos cache stored in Airflow Variables that hasn't been used for {max_age_last_usage}") + cosmos_dags_ids = defaultdict(list) + all_variables = session.scalars(select(Variable)).all() + total_cosmos_variables = 0 + deleted_cosmos_variables = 0 + + # Identify Cosmos-related cache in Airflow variables + for var in all_variables: + if var.key.startswith(VAR_KEY_CACHE_PREFIX): + var_value = json.loads(var.val) + cosmos_dags_ids[var_value["dag_id"]].append(var.key) + total_cosmos_variables += 1 + + # Delete DAGs that have not been run in the last X time + for dag_id, vars_keys in cosmos_dags_ids.items(): + last_dag_run = ( + session.query(DagRun) + .filter( + DagRun.dag_id == dag_id, + ) + .order_by(DagRun.execution_date.desc()) + .first() + ) + if last_dag_run and last_dag_run.execution_date < (datetime.now(timezone.utc) - max_age_last_usage): + for var_key in vars_keys: + logger.info(f"Removing the dbt ls cache {var_key}") + Variable.delete(var_key) + deleted_cosmos_variables += 1 + + logger.info( + f"Deleted {deleted_cosmos_variables}/{total_cosmos_variables} Airflow Variables used to store Cosmos cache. " + ) + return deleted_cosmos_variables diff --git a/cosmos/config.py b/cosmos/config.py index 13622563e..5ca21709d 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -71,6 +71,7 @@ class RenderConfig: dbt_ls_path: Path | None = None project_path: Path | None = field(init=False) enable_mock_profile: bool = True + airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list) def __post_init__(self, dbt_project_path: str | Path | None) -> None: if self.env_vars: diff --git a/cosmos/constants.py b/cosmos/constants.py index 92bf883b2..2a1abb20e 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -36,6 +36,7 @@ class LoadMode(Enum): CUSTOM = "custom" DBT_LS = "dbt_ls" DBT_LS_FILE = "dbt_ls_file" + DBT_LS_CACHE = "dbt_ls_cache" DBT_MANIFEST = "dbt_manifest" diff --git a/cosmos/converter.py b/cosmos/converter.py index 5e415486e..40929ef55 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -5,6 +5,9 @@ import copy import inspect +import os +import platform +import time from typing import Any, Callable from warnings import warn @@ -225,19 +228,31 @@ def __init__( dbt_vars = project_config.dbt_vars or operator_args.get("vars") cache_dir = None + cache_identifier = None if settings.enable_cache: - cache_dir = cache._obtain_cache_dir_path(cache_identifier=cache._create_cache_identifier(dag, task_group)) + cache_identifier = cache._create_cache_identifier(dag, task_group) + cache_dir = cache._obtain_cache_dir_path(cache_identifier=cache_identifier) + previous_time = time.perf_counter() self.dbt_graph = DbtGraph( project=project_config, render_config=render_config, execution_config=execution_config, profile_config=profile_config, cache_dir=cache_dir, + cache_identifier=cache_identifier, dbt_vars=dbt_vars, + airflow_metadata=cache._get_airflow_metadata(dag, task_group), ) self.dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode) + current_time = time.perf_counter() + elapsed_time = current_time - previous_time + logger.info( + f"Cosmos performance ({cache_identifier}) - [{platform.node()}|{os.getpid()}]: It took {elapsed_time:.3}s to parse the dbt project for DAG using {self.dbt_graph.load_method}" + ) + previous_time = current_time + task_args = { **operator_args, "project_dir": execution_config.project_path, @@ -272,3 +287,9 @@ def __init__( on_warning_callback=on_warning_callback, render_config=render_config, ) + + current_time = time.perf_counter() + elapsed_time = current_time - previous_time + logger.info( + f"Cosmos performance ({cache_identifier}) - [{platform.node()}|{os.getpid()}]: It took {elapsed_time:.3}s to build the Airflow DAG." + ) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index bd3181a20..8dba0a92e 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -1,17 +1,23 @@ from __future__ import annotations +import base64 +import datetime +import functools import itertools import json import os +import platform import tempfile +import zlib from dataclasses import dataclass, field +from functools import cached_property from pathlib import Path from subprocess import PIPE, Popen from typing import Any -import yaml +from airflow.models import Variable -from cosmos import cache +from cosmos import cache, settings from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig from cosmos.constants import ( DBT_LOG_DIR_NAME, @@ -116,7 +122,7 @@ def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> return stdout -def parse_dbt_ls_output(project_path: Path, ls_stdout: str) -> dict[str, DbtNode]: +def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str, DbtNode]: """Parses the output of `dbt ls` into a dictionary of `DbtNode` instances.""" nodes = {} for line in ls_stdout.split("\n"): @@ -148,6 +154,8 @@ class DbtGraph: nodes: dict[str, DbtNode] = dict() filtered_nodes: dict[str, DbtNode] = dict() + load_method: LoadMode = LoadMode.AUTOMATIC + current_version: str = "" def __init__( self, @@ -156,16 +164,135 @@ def __init__( execution_config: ExecutionConfig = ExecutionConfig(), profile_config: ProfileConfig | None = None, cache_dir: Path | None = None, - # dbt_vars only supported for LegacyDbtProject + cache_identifier: str = "", dbt_vars: dict[str, str] | None = None, + airflow_metadata: dict[str, str] | None = None, ): self.project = project self.render_config = render_config self.profile_config = profile_config self.execution_config = execution_config self.cache_dir = cache_dir + self.airflow_metadata = airflow_metadata or {} + if cache_identifier: + self.dbt_ls_cache_key = cache.create_cache_key(cache_identifier) + else: + self.dbt_ls_cache_key = "" self.dbt_vars = dbt_vars or {} + @cached_property + def env_vars(self) -> dict[str, str]: + """ + User-defined environment variables, relevant to running dbt ls. + """ + return self.render_config.env_vars or self.project.env_vars or {} + + @cached_property + def project_path(self) -> Path: + """ + Return the user-defined path to their dbt project. Tries to retrieve the configuration from render_config and + (legacy support) ExecutionConfig, where it was originally defined. + """ + # we're considering the execution_config only due to backwards compatibility + path = self.render_config.project_path or self.project.dbt_project_path or self.execution_config.project_path + if not path: + raise CosmosLoadDbtException( + "Unable to load project via dbt ls without RenderConfig.dbt_project_path, ProjectConfig.dbt_project_path or ExecutionConfig.dbt_project_path" + ) + return path.absolute() + + @cached_property + def dbt_ls_args(self) -> list[str]: + """ + Flags set while running dbt ls. This information is also used to define the dbt ls cache key. + """ + ls_args = [] + if self.render_config.exclude: + ls_args.extend(["--exclude", *self.render_config.exclude]) + + if self.render_config.select: + ls_args.extend(["--select", *self.render_config.select]) + + if self.project.dbt_vars: + ls_args.extend(["--vars", json.dumps(self.project.dbt_vars, sort_keys=True)]) + + if self.render_config.selector: + ls_args.extend(["--selector", self.render_config.selector]) + + if not self.project.partial_parse: + ls_args.append("--no-partial-parse") + + return ls_args + + @cached_property + def dbt_ls_cache_key_args(self) -> list[str]: + """ + Values that are used to represent the dbt ls cache key. If any parts are changed, the dbt ls command will be + executed and the new value will be stored. + """ + # if dbt deps, we can consider the md5 of the packages or deps file + cache_args = list(self.dbt_ls_args) + env_vars = self.env_vars + if env_vars: + envvars_str = json.dumps(env_vars, sort_keys=True) + cache_args.append(envvars_str) + if self.render_config.airflow_vars_to_purge_dbt_ls_cache: + for var_name in self.render_config.airflow_vars_to_purge_dbt_ls_cache: + airflow_vars = [var_name, Variable.get(var_name, "")] + cache_args.extend(airflow_vars) + + logger.debug(f"Value of `dbt_ls_cache_key_args` for <{self.dbt_ls_cache_key}>: {cache_args}") + return cache_args + + def save_dbt_ls_cache(self, dbt_ls_output: str) -> None: + """ + Store compressed dbt ls output into an Airflow Variable. + + Stores: + { + "version": "cache-version", + "dbt_ls_compressed": "compressed dbt ls output", + "last_modified": "Isoformat timestamp" + } + """ + # This compression reduces the dbt ls output to 10% of the original size + compressed_data = zlib.compress(dbt_ls_output.encode("utf-8")) + encoded_data = base64.b64encode(compressed_data) + dbt_ls_compressed = encoded_data.decode("utf-8") + cache_dict = { + "version": cache._calculate_dbt_ls_cache_current_version( + self.dbt_ls_cache_key, self.project_path, self.dbt_ls_cache_key_args + ), + "dbt_ls_compressed": dbt_ls_compressed, + "last_modified": datetime.datetime.now(datetime.timezone.utc).isoformat(), + **self.airflow_metadata, + } + Variable.set(self.dbt_ls_cache_key, cache_dict, serialize_json=True) + + def get_dbt_ls_cache(self) -> dict[str, str]: + """ + Retrieve previously saved dbt ls cache from an Airflow Variable, decompressing the dbt ls output. + + Outputs: + { + "version": "cache-version", + "dbt_ls": "uncompressed dbt ls output", + "last_modified": "Isoformat timestamp" + } + """ + cache_dict: dict[str, str] = {} + try: + cache_dict = Variable.get(self.dbt_ls_cache_key, deserialize_json=True) + except (json.decoder.JSONDecodeError, KeyError): + return cache_dict + else: + dbt_ls_compressed = cache_dict.pop("dbt_ls_compressed", None) + if dbt_ls_compressed: + encoded_data = base64.b64decode(dbt_ls_compressed.encode()) + cache_dict["dbt_ls"] = zlib.decompress(encoded_data).decode() + + return cache_dict + def load( self, method: LoadMode = LoadMode.AUTOMATIC, @@ -181,11 +308,11 @@ def load( Fundamentally, there are two different execution paths There is automatic, and manual. """ - load_method = { LoadMode.CUSTOM: self.load_via_custom_parser, LoadMode.DBT_LS: self.load_via_dbt_ls, LoadMode.DBT_LS_FILE: self.load_via_dbt_ls_file, + LoadMode.DBT_LS_CACHE: self.load_via_dbt_ls_cache, LoadMode.DBT_MANIFEST: self.load_from_dbt_manifest, } @@ -214,22 +341,9 @@ def run_dbt_ls( """Runs dbt ls command and returns the parsed nodes.""" ls_command = [dbt_cmd, "ls", "--output", "json"] - if self.render_config.exclude: - ls_command.extend(["--exclude", *self.render_config.exclude]) - - if self.render_config.select: - ls_command.extend(["--select", *self.render_config.select]) - - if self.project.dbt_vars: - ls_command.extend(["--vars", yaml.dump(self.project.dbt_vars)]) - - if self.render_config.selector: - ls_command.extend(["--selector", self.render_config.selector]) - - if not self.project.partial_parse: - ls_command.append("--no-partial-parse") - + ls_args = self.dbt_ls_args ls_command.extend(self.local_flags) + ls_command.extend(ls_args) stdout = run_command(ls_command, tmp_dir, env_vars) @@ -241,10 +355,56 @@ def run_dbt_ls( for line in logfile: logger.debug(line.strip()) + if self.should_use_dbt_ls_cache(): + self.save_dbt_ls_cache(stdout) + nodes = parse_dbt_ls_output(project_path, stdout) return nodes def load_via_dbt_ls(self) -> None: + """Retrieve the dbt ls cache if enabled and available or run dbt ls""" + if not self.load_via_dbt_ls_cache(): + self.load_via_dbt_ls_without_cache() + + @functools.lru_cache + def should_use_dbt_ls_cache(self) -> bool: + """Identify if Cosmos should use/store dbt ls cache or not.""" + return settings.enable_cache and settings.enable_cache_dbt_ls and bool(self.dbt_ls_cache_key) + + def load_via_dbt_ls_cache(self) -> bool: + """(Try to) load dbt ls cache from an Airflow Variable""" + + logger.info(f"Trying to parse the dbt project using dbt ls cache {self.dbt_ls_cache_key}...") + if self.should_use_dbt_ls_cache(): + project_path = self.project_path + + cache_dict = self.get_dbt_ls_cache() + if not cache_dict: + logger.info(f"Cosmos performance: Cache miss for {self.dbt_ls_cache_key}") + return False + + cache_version = cache_dict.get("version") + dbt_ls_cache = cache_dict.get("dbt_ls") + + current_version = cache._calculate_dbt_ls_cache_current_version( + self.dbt_ls_cache_key, project_path, self.dbt_ls_cache_key_args + ) + + if dbt_ls_cache and not cache.was_project_modified(cache_version, current_version): + logger.info( + f"Cosmos performance [{platform.node()}|{os.getpid()}]: The cache size for {self.dbt_ls_cache_key} is {len(dbt_ls_cache)}" + ) + self.load_method = LoadMode.DBT_LS_CACHE + + nodes = parse_dbt_ls_output(project_path=project_path, ls_stdout=dbt_ls_cache) + self.nodes = nodes + self.filtered_nodes = nodes + logger.info(f"Cosmos performance: Cache hit for {self.dbt_ls_cache_key} - {current_version}") + return True + logger.info(f"Cosmos performance: Cache miss for {self.dbt_ls_cache_key} - skipped") + return False + + def load_via_dbt_ls_without_cache(self) -> None: """ This is the most accurate way of loading `dbt` projects and filtering them out, since it uses the `dbt` command line for both parsing and filtering the nodes. @@ -253,37 +413,33 @@ def load_via_dbt_ls(self) -> None: * self.nodes * self.filtered_nodes """ + self.load_method = LoadMode.DBT_LS self.render_config.validate_dbt_command(fallback_cmd=self.execution_config.dbt_executable_path) dbt_cmd = self.render_config.dbt_executable_path dbt_cmd = dbt_cmd.as_posix() if isinstance(dbt_cmd, Path) else dbt_cmd logger.info(f"Trying to parse the dbt project in `{self.render_config.project_path}` using dbt ls...") - if not self.render_config.project_path or not self.execution_config.project_path: - raise CosmosLoadDbtException( - "Unable to load project via dbt ls without RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path" - ) - + project_path = self.project_path if not self.profile_config: raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.") with tempfile.TemporaryDirectory() as tmpdir: - logger.info( + logger.debug( f"Content of the dbt project dir {self.render_config.project_path}: `{os.listdir(self.render_config.project_path)}`" ) tmpdir_path = Path(tmpdir) - abs_project_path = self.render_config.project_path.absolute() - create_symlinks(abs_project_path, tmpdir_path, self.render_config.dbt_deps) + create_symlinks(project_path, tmpdir_path, self.render_config.dbt_deps) if self.project.partial_parse and self.cache_dir: - latest_partial_parse = cache._get_latest_partial_parse(abs_project_path, self.cache_dir) + latest_partial_parse = cache._get_latest_partial_parse(project_path, self.cache_dir) logger.info("Partial parse is enabled and the latest partial parse file is %s", latest_partial_parse) if latest_partial_parse is not None: cache._copy_partial_parse_to_project(latest_partial_parse, tmpdir_path) with self.profile_config.ensure_profile( use_mock_values=self.render_config.enable_mock_profile - ) as profile_values, environ(self.project.env_vars or self.render_config.env_vars or {}): + ) as profile_values, environ(self.env_vars): (profile_path, env_vars) = profile_values env = os.environ.copy() env.update(env_vars) @@ -303,15 +459,13 @@ def load_via_dbt_ls(self) -> None: env[DBT_LOG_PATH_ENVVAR] = str(self.log_dir) env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir) - if self.render_config.dbt_deps and has_non_empty_dependencies_file( - Path(self.render_config.project_path) - ): + if self.render_config.dbt_deps and has_non_empty_dependencies_file(self.project_path): deps_command = [dbt_cmd, "deps"] deps_command.extend(self.local_flags) stdout = run_command(deps_command, tmpdir_path, env) logger.debug("dbt deps output: %s", stdout) - nodes = self.run_dbt_ls(dbt_cmd, self.execution_config.project_path, tmpdir_path, env) + nodes = self.run_dbt_ls(dbt_cmd, self.project_path, tmpdir_path, env) self.nodes = nodes self.filtered_nodes = nodes @@ -330,6 +484,7 @@ def load_via_dbt_ls_file(self) -> None: This technically should increase performance and also removes the necessity to have your whole dbt project copied to the airflow image. """ + self.load_method = LoadMode.DBT_LS_FILE logger.info("Trying to parse the dbt project `%s` using a dbt ls output file...", self.project.project_name) if not self.render_config.is_dbt_ls_file_available(): @@ -357,6 +512,7 @@ def load_via_custom_parser(self) -> None: * self.nodes * self.filtered_nodes """ + self.load_method = LoadMode.CUSTOM logger.info("Trying to parse the dbt project `%s` using a custom Cosmos method...", self.project.project_name) if self.render_config.selector: @@ -415,6 +571,7 @@ def load_from_dbt_manifest(self) -> None: * self.nodes * self.filtered_nodes """ + self.load_method = LoadMode.DBT_MANIFEST logger.info("Trying to parse the dbt project `%s` using a dbt manifest...", self.project.project_name) if self.render_config.selector: diff --git a/cosmos/settings.py b/cosmos/settings.py index fc5954131..68ed8758f 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -11,6 +11,8 @@ DEFAULT_CACHE_DIR = Path(tempfile.gettempdir(), DEFAULT_COSMOS_CACHE_DIR_NAME) cache_dir = Path(conf.get("cosmos", "cache_dir", fallback=DEFAULT_CACHE_DIR) or DEFAULT_CACHE_DIR) enable_cache = conf.getboolean("cosmos", "enable_cache", fallback=True) +enable_cache_partial_parse = conf.getboolean("cosmos", "enable_cache_partial_parse", fallback=True) +enable_cache_dbt_ls = conf.getboolean("cosmos", "enable_cache_dbt_ls", fallback=True) propagate_logs = conf.getboolean("cosmos", "propagate_logs", fallback=True) dbt_docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None) dbt_docs_conn_id = conf.get("cosmos", "dbt_docs_conn_id", fallback=None) diff --git a/dev/dags/basic_cosmos_task_group.py b/dev/dags/basic_cosmos_task_group.py index 4221e3019..d63cf2c92 100644 --- a/dev/dags/basic_cosmos_task_group.py +++ b/dev/dags/basic_cosmos_task_group.py @@ -43,10 +43,13 @@ def basic_cosmos_task_group() -> None: customers = DbtTaskGroup( group_id="customers", - project_config=ProjectConfig( - (DBT_ROOT_PATH / "jaffle_shop").as_posix(), + project_config=ProjectConfig((DBT_ROOT_PATH / "jaffle_shop").as_posix(), dbt_vars={"var": "2"}), + render_config=RenderConfig( + select=["path:seeds/raw_customers.csv"], + enable_mock_profile=False, + env_vars={"PURGE": os.getenv("PURGE", "0")}, + airflow_vars_to_purge_dbt_ls_cache=["purge"], ), - render_config=RenderConfig(select=["path:seeds/raw_customers.csv"], enable_mock_profile=False), execution_config=shared_execution_config, operator_args={"install_deps": True}, profile_config=profile_config, diff --git a/dev/dags/example_cosmos_cleanup_dag.py b/dev/dags/example_cosmos_cleanup_dag.py new file mode 100644 index 000000000..c93bdf002 --- /dev/null +++ b/dev/dags/example_cosmos_cleanup_dag.py @@ -0,0 +1,34 @@ +""" +Example of cleanup DAG that can be used to clear cache originated from running the dbt ls command while +parsing the DbtDag or DbtTaskGroup since Cosmos 1.5. +""" + +# [START cache_example] +from datetime import datetime, timedelta + +from airflow.decorators import dag, task + +from cosmos.cache import delete_unused_dbt_ls_cache + + +@dag( + schedule_interval="0 0 * * 0", # Runs every Sunday + start_date=datetime(2023, 1, 1), + catchup=False, + tags=["example"], +) +def example_cosmos_cleanup_dag(): + + @task() + def clear_db_ls_cache(session=None): + """ + Delete the dbt ls cache that has not been used for the last five days. + """ + delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(days=5)) + + clear_db_ls_cache() + + +# [END cache_example] + +example_cosmos_cleanup_dag() diff --git a/docs/configuration/caching.rst b/docs/configuration/caching.rst new file mode 100644 index 000000000..b5ec155da --- /dev/null +++ b/docs/configuration/caching.rst @@ -0,0 +1,118 @@ +.. _caching: + +Caching +======= + +This page explains the caching strategies in ``astronomer-cosmos`` Astronomer Cosmos behavior. + +All Cosmos caching mechanisms can be enabled or turned off in the ``airflow.cfg`` file or using environment variables. + +.. note:: + For more information, see `configuring a Cosmos project <./project-config.html>`_. + +Depending on the Cosmos version, it creates a cache for two types of data: + +- The ``dbt ls`` output +- The dbt ``partial_parse.msgpack`` file + +It is possible to turn off any cache in Cosmos by exporting the environment variable ``AIRFLOW__COSMOS__ENABLE_CACHE=0``. +Disabling individual types of cache in Cosmos is also possible, as explained below. + +Caching the dbt ls output +~~~~~~~~~~~~~ + +(Introduced in Cosmos 1.5) + +While parsing a dbt project using `LoadMode.DBT_LS <./parsing-methods.html#dbt-ls>`_, Cosmos uses subprocess to run ``dbt ls``. +This operation can be very costly; it can increase the DAG parsing times and affect not only the scheduler DAG processing but +also the tasks queueing time. + +Cosmos 1.5 introduced a feature to mitigate the performance issue associated with ``LoadMode.DBT_LS`` by caching the output +of this command as an `Airflow Variable `_. +Based on an initial `analysis `_, enabling this setting reduced some DAGs ask queueing from 30s to 0s. Additionally, some users `reported improvements of 84% `_ in the DAG run time. + +This feature is on by default. To turn it off, export the following environment variable: ``AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS=0``. + +**How the cache is refreshed** + +Users can purge or delete the cache via Airflow UI by identifying and deleting the cache key. + +Cosmos will refresh the cache in a few circumstances: + +* if any files of the dbt project change +* if one of the arguments that affect the dbt ls command execution changes + +To evaluate if the dbt project changed, it calculates the changes using a few of the MD5 of all the files in the directory. + +Additionally, if any of the following DAG configurations are changed, we'll automatically purge the cache of the DAGs that use that specific configuration: + +* ``ProjectConfig.dbt_vars`` +* ``ProjectConfig.env_vars`` +* ``ProjectConfig.partial_parse`` +* ``RenderConfig.env_vars`` +* ``RenderConfig.exclude`` +* ``RenderConfig.select`` +* ``RenderConfig.selector`` + +Finally, if users would like to define specific Airflow variables that, if changed, will cause the recreation of the cache, they can specify those by using: + +* ``RenderConfig.airflow_vars_to_purge_cache`` + +Example: + +.. code-block:: python + + RenderConfig(airflow_vars_to_purge_cache == ["refresh_cache"]) + +**Cleaning up stale cache** + +Not rarely, Cosmos DbtDags and DbtTaskGroups may be renamed or deleted. In those cases, to clean up the Airflow metadata database, it is possible to use the method ``delete_unused_dbt_ls_cache``. + +The method deletes the Cosmos cache stored in Airflow Variables based on the last execution of their associated DAGs. + +As an example, the following clean-up DAG will delete any cache associated with Cosmos that has not been used for the last five days: + +.. literalinclude:: ../../dev/dags/example_cosmos_cleanup_dag.py + :language: python + :start-after: [START cache_example] + :end-before: [END cache_example] + +**Cache key** + +The Airflow variables that represent the dbt ls cache are prefixed by ``cosmos_cache``. +When using ``DbtDag``, the keys use the DAG name. When using ``DbtTaskGroup``, they contain the ``TaskGroup`` and parent task groups and DAG. + +Examples: + +* The ``DbtDag`` "cosmos_dag" will have the cache represented by "cosmos_cache__basic_cosmos_dag". +* The ``DbtTaskGroup`` "customers" declared inside the DAG "basic_cosmos_task_group" will have the cache key "cosmos_cache__basic_cosmos_task_group__customers". + +**Cache value** + +The cache values contain a few properties: + +* ``last_modified`` timestamp, represented using the ISO 8601 format. +* ``version`` is a hash that represents the version of the dbt project and arguments used to run dbt ls by the time Cosmos created the cache +* ``dbt_ls_compressed`` represents the dbt ls output compressed using zlib and encoded to base64 so Cosmos can record the value as a compressed string in the Airflow metadata database. +* ``dag_id`` is the DAG associated to this cache +* ``task_group_id`` is the TaskGroup associated to this cache +* ``cosmos_type`` is either ``DbtDag`` or ``DbtTaskGroup`` + + +Caching the partial parse file +~~~~~~~~~~~~~ + +(Introduced in Cosmos 1.4) + +After parsing the dbt project, dbt stores an internal project manifest in a file called ``partial_parse.msgpack`` (`official docs `_). +This file contributes significantly to the performance of running dbt commands when the dbt project did not change. + +Cosmos 1.4 introduced `support to partial parse files `_ both +provided by the user, and also by storing in the disk temporary folder in the Airflow scheduler and worker node the file +generated after running dbt commands. + +Users can customize where to store the cache using the setting ``AIRFLOW__COSMOS__CACHE_DIR``. + +It is possible to switch off this feature by exporting the environment variable ``AIRFLOW__COSMOS__ENABLE_CACHE_PARTIAL_PARSE=0``. + +For more information, read the `Cosmos partial parsing documentation <./partial-parsing.html>`_ diff --git a/docs/configuration/cosmos-conf.rst b/docs/configuration/cosmos-conf.rst index 1d334884f..9c1b56c89 100644 --- a/docs/configuration/cosmos-conf.rst +++ b/docs/configuration/cosmos-conf.rst @@ -30,6 +30,22 @@ This page lists all available Airflow configurations that affect ``astronomer-co - Default: ``True`` - Environment Variable: ``AIRFLOW__COSMOS__ENABLE_CACHE`` +.. enable_cache_dbt_ls: + +`enable_cache_dbt_ls`_: + Enable or disable caching of the dbt ls command in case using ``LoadMode.DBT_LS`` in an Airflow Variable. + + - Default: ``True`` + - Environment Variable: ``AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS`` + +.. _enable_cache_partial_parse: + +`enable_cache_partial_parse`_: + Enable or disable caching of dbt partial parse files in the local disk. + + - Default: ``True`` + - Environment Variable: ``AIRFLOW__COSMOS__ENABLE_CACHE_PARTIAL_PARSE`` + .. _propagate_logs: `propagate_logs`_: diff --git a/docs/configuration/index.rst b/docs/configuration/index.rst index fc34b993e..90f195938 100644 --- a/docs/configuration/index.rst +++ b/docs/configuration/index.rst @@ -25,3 +25,4 @@ Cosmos offers a number of configuration options to customize its behavior. For m Operator Args Compiled SQL Logging + Caching diff --git a/docs/configuration/parsing-methods.rst b/docs/configuration/parsing-methods.rst index 14dafb021..ebd6030e6 100644 --- a/docs/configuration/parsing-methods.rst +++ b/docs/configuration/parsing-methods.rst @@ -66,6 +66,9 @@ If you don't have a ``manifest.json`` file, Cosmos will attempt to generate one When Cosmos runs ``dbt ls``, it also passes your ``select`` and ``exclude`` arguments to the command. This means that Cosmos will only generate a manifest for the models you want to run. +Starting in Cosmos 1.5, Cosmos will cache the output of the ``dbt ls`` command, to improve the performance of this +parsing method. Learn more `here <./caching.html>`_. + To use this: .. code-block:: python diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index f3e216712..4b2535e07 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -17,6 +17,7 @@ The ``RenderConfig`` class takes the following arguments: - ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. - ``env_vars``: (available in v1.2.5, use``ProjectConfig.env_vars`` for v1.3.0 onwards) A dictionary of environment variables for rendering. Only supported when using ``load_method=LoadMode.DBT_LS``. - ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM`` +- ``airflow_vars_to_purge_cache``: (new in v1.5) Specify Airflow variables that will affect the ``LoadMode.DBT_LS`` cache. See `Caching <./caching.html>`_ for more information. Customizing how nodes are rendered (experimental) ------------------------------------------------- diff --git a/pyproject.toml b/pyproject.toml index c8bee0b20..6c518613b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -81,6 +81,7 @@ tests = [ "pytest-cov", "pytest-describe", "sqlalchemy-stubs", # Change when sqlalchemy is upgraded https://docs.sqlalchemy.org/en/14/orm/extensions/mypy.html + "types-pytz", "types-requests", "sqlalchemy-stubs", # Change when sqlalchemy is upgraded https://docs.sqlalchemy.org/en/14/orm/extensions/mypy.html "pre-commit", @@ -136,7 +137,7 @@ dependencies = [ "types-requests", "types-python-dateutil", "Werkzeug<3.0.0", - "apache-airflow=={matrix:airflow}.0", + "apache-airflow~={matrix:airflow}.0,!=2.9.0,!=2.9.1", # https://github.com/apache/airflow/pull/39670 ] pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"] diff --git a/scripts/test/integration-setup.sh b/scripts/test/integration-setup.sh index eba4f1513..c6e106fd5 100644 --- a/scripts/test/integration-setup.sh +++ b/scripts/test/integration-setup.sh @@ -1,6 +1,14 @@ +#!/bin/bash + +set -v +set -x +set -e + # we install using the following workaround to overcome installation conflicts, such as: # apache-airflow 2.3.0 and dbt-core [0.13.0 - 1.5.2] and jinja2>=3.0.0 because these package versions have conflicting dependencies -pip uninstall -y dbt-postgres dbt-databricks dbt-vertica; \ -rm -rf airflow.*; \ -airflow db init; \ +pip uninstall -y dbt-postgres dbt-databricks dbt-vertica +rm -rf airflow.* +pip freeze | grep airflow +airflow db reset -y +airflow db init pip install 'dbt-core' 'dbt-databricks' 'dbt-postgres' 'dbt-vertica' 'openlineage-airflow' diff --git a/scripts/test/integration.sh b/scripts/test/integration.sh index 823f70a7e..1d8264768 100644 --- a/scripts/test/integration.sh +++ b/scripts/test/integration.sh @@ -1,3 +1,15 @@ +#!/bin/bash + +set -x +set -e + +pip freeze | grep airflow +echo $AIRFLOW_HOME +ls $AIRFLOW_HOME + +airflow db check + + rm -rf dbt/jaffle_shop/dbt_packages; pytest -vv \ --cov=cosmos \ diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 652a81482..9e931ba8c 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1,12 +1,17 @@ +import importlib +import os import shutil +import sys import tempfile +from datetime import datetime from pathlib import Path from subprocess import PIPE, Popen from unittest.mock import MagicMock, patch import pytest -import yaml +from airflow.models import Variable +from cosmos import settings from cosmos.config import CosmosConfigException, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig from cosmos.constants import DBT_TARGET_DIR_NAME, DbtResourceType, ExecutionMode from cosmos.dbt.graph import ( @@ -587,7 +592,7 @@ def test_load_via_dbt_ls_without_dbt_deps(postgres_profile_config): ) with pytest.raises(CosmosLoadDbtException) as err_info: - dbt_graph.load_via_dbt_ls() + dbt_graph.load_via_dbt_ls_without_cache() expected = "Unable to run dbt ls command due to missing dbt_packages. Set RenderConfig.dbt_deps=True." assert err_info.value.args[0] == expected @@ -658,12 +663,12 @@ def test_load_via_dbt_ls_caching_partial_parsing(tmp_dbt_project_dir, postgres_p (tmp_path / DBT_TARGET_DIR_NAME).mkdir(parents=True, exist_ok=True) # First time dbt ls is run, partial parsing was not cached, so we don't benefit from this - dbt_graph.load_via_dbt_ls() + dbt_graph.load_via_dbt_ls_without_cache() assert "Unable to do partial parsing" in caplog.text # From the second time we run dbt ls onwards, we benefit from partial parsing caplog.clear() - dbt_graph.load_via_dbt_ls() # should not not raise exception + dbt_graph.load_via_dbt_ls_without_cache() # should not not raise exception assert not "Unable to do partial parsing" in caplog.text @@ -978,10 +983,13 @@ def test_parse_dbt_ls_output_with_json_without_tags_or_config(): assert expected_nodes == nodes +@patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=False) @patch("cosmos.dbt.graph.Popen") @patch("cosmos.dbt.graph.DbtGraph.update_node_dependency") @patch("cosmos.config.RenderConfig.validate_dbt_command") -def test_load_via_dbt_ls_project_config_env_vars(mock_validate, mock_update_nodes, mock_popen, tmp_dbt_project_dir): +def test_load_via_dbt_ls_project_config_env_vars( + mock_validate, mock_update_nodes, mock_popen, mock_enable_cache, tmp_dbt_project_dir +): """Tests that the dbt ls command in the subprocess has the project config env vars set.""" mock_popen().communicate.return_value = ("", "") mock_popen().returncode = 0 @@ -1006,10 +1014,13 @@ def test_load_via_dbt_ls_project_config_env_vars(mock_validate, mock_update_node assert mock_popen.call_args.kwargs["env"]["MY_ENV_VAR"] == "my_value" +@patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=False) @patch("cosmos.dbt.graph.Popen") @patch("cosmos.dbt.graph.DbtGraph.update_node_dependency") @patch("cosmos.config.RenderConfig.validate_dbt_command") -def test_load_via_dbt_ls_project_config_dbt_vars(mock_validate, mock_update_nodes, mock_popen, tmp_dbt_project_dir): +def test_load_via_dbt_ls_project_config_dbt_vars( + mock_validate, mock_update_nodes, mock_popen, mock_use_case, tmp_dbt_project_dir +): """Tests that the dbt ls command in the subprocess has "--vars" with the project config dbt_vars.""" mock_popen().communicate.return_value = ("", "") mock_popen().returncode = 0 @@ -1031,14 +1042,15 @@ def test_load_via_dbt_ls_project_config_dbt_vars(mock_validate, mock_update_node dbt_graph.load_via_dbt_ls() ls_command = mock_popen.call_args.args[0] assert "--vars" in ls_command - assert ls_command[ls_command.index("--vars") + 1] == yaml.dump(dbt_vars) + assert ls_command[ls_command.index("--vars") + 1] == '{"my_var1": "my_value1", "my_var2": "my_value2"}' +@patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=False) @patch("cosmos.dbt.graph.Popen") @patch("cosmos.dbt.graph.DbtGraph.update_node_dependency") @patch("cosmos.config.RenderConfig.validate_dbt_command") def test_load_via_dbt_ls_render_config_selector_arg_is_used( - mock_validate, mock_update_nodes, mock_popen, tmp_dbt_project_dir + mock_validate, mock_update_nodes, mock_popen, mock_enable_cache, tmp_dbt_project_dir ): """Tests that the dbt ls command in the subprocess has "--selector" with the RenderConfig.selector.""" mock_popen().communicate.return_value = ("", "") @@ -1068,11 +1080,12 @@ def test_load_via_dbt_ls_render_config_selector_arg_is_used( assert ls_command[ls_command.index("--selector") + 1] == selector +@patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=False) @patch("cosmos.dbt.graph.Popen") @patch("cosmos.dbt.graph.DbtGraph.update_node_dependency") @patch("cosmos.config.RenderConfig.validate_dbt_command") def test_load_via_dbt_ls_render_config_no_partial_parse( - mock_validate, mock_update_nodes, mock_popen, tmp_dbt_project_dir + mock_validate, mock_update_nodes, mock_popen, mock_enable_cache, tmp_dbt_project_dir ): """Tests that --no-partial-parse appears when partial_parse=False.""" mock_popen().communicate.return_value = ("", "") @@ -1180,3 +1193,219 @@ def test_load_via_dbt_ls_with_selector_arg(tmp_dbt_project_dir, postgres_profile assert "seed.jaffle_shop.raw_customers" in filtered_nodes # Two tests should be filtered assert sum(node.startswith("test.jaffle_shop") for node in filtered_nodes) == 2 + + +@pytest.mark.parametrize( + "render_config,project_config,expected_envvars", + [ + (RenderConfig(), ProjectConfig(), {}), + (RenderConfig(env_vars={"a": 1}), ProjectConfig(), {"a": 1}), + (RenderConfig(), ProjectConfig(env_vars={"b": 2}), {"b": 2}), + (RenderConfig(env_vars={"a": 1}), ProjectConfig(env_vars={"b": 2}), {"a": 1}), + ], +) +def test_env_vars(render_config, project_config, expected_envvars): + graph = DbtGraph( + project=project_config, + render_config=render_config, + ) + assert graph.env_vars == expected_envvars + + +def test_project_path_fails(): + graph = DbtGraph(project=ProjectConfig()) + with pytest.raises(CosmosLoadDbtException) as e: + graph.project_path + + expected = "Unable to load project via dbt ls without RenderConfig.dbt_project_path, ProjectConfig.dbt_project_path or ExecutionConfig.dbt_project_path" + assert e.value.args[0] == expected + + +@pytest.mark.parametrize( + "render_config,project_config,expected_dbt_ls_args", + [ + (RenderConfig(), ProjectConfig(), []), + (RenderConfig(exclude=["package:snowplow"]), ProjectConfig(), ["--exclude", "package:snowplow"]), + ( + RenderConfig(select=["tag:prod", "config.materialized:incremental"]), + ProjectConfig(), + ["--select", "tag:prod", "config.materialized:incremental"], + ), + (RenderConfig(selector="nightly"), ProjectConfig(), ["--selector", "nightly"]), + (RenderConfig(), ProjectConfig(dbt_vars={"a": 1}), ["--vars", '{"a": 1}']), + (RenderConfig(), ProjectConfig(partial_parse=False), ["--no-partial-parse"]), + ( + RenderConfig(exclude=["1", "2"], select=["a", "b"], selector="nightly"), + ProjectConfig(dbt_vars={"a": 1}, partial_parse=False), + [ + "--exclude", + "1", + "2", + "--select", + "a", + "b", + "--vars", + '{"a": 1}', + "--selector", + "nightly", + "--no-partial-parse", + ], + ), + ], +) +def test_dbt_ls_args(render_config, project_config, expected_dbt_ls_args): + graph = DbtGraph( + project=project_config, + render_config=render_config, + ) + assert graph.dbt_ls_args == expected_dbt_ls_args + + +def test_dbt_ls_cache_key_args_sorts_envvars(): + project_config = ProjectConfig(env_vars={11: "November", 12: "December", 5: "May"}) + graph = DbtGraph(project=project_config) + assert graph.dbt_ls_cache_key_args == ['{"5": "May", "11": "November", "12": "December"}'] + + +@pytest.fixture() +def airflow_variable(): + key = "cosmos_cache__undefined" + value = "some_value" + Variable.set(key, value) + + yield key, value + + Variable.delete(key) + + +@pytest.mark.integration +def test_dbt_ls_cache_key_args_uses_airflow_vars_to_purge_dbt_ls_cache(airflow_variable): + key, value = airflow_variable + graph = DbtGraph(project=ProjectConfig(), render_config=RenderConfig(airflow_vars_to_purge_dbt_ls_cache=[key])) + assert graph.dbt_ls_cache_key_args == [key, value] + + +@patch("cosmos.dbt.graph.datetime") +@patch("cosmos.dbt.graph.Variable.set") +def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir): + mock_datetime.datetime.now.return_value = datetime(2022, 1, 1, 12, 0, 0) + graph = DbtGraph(cache_identifier="something", project=ProjectConfig(dbt_project_path=tmp_dbt_project_dir)) + dbt_ls_output = "some output" + graph.save_dbt_ls_cache(dbt_ls_output) + assert mock_variable_set.call_args[0][0] == "cosmos_cache__something" + assert mock_variable_set.call_args[0][1]["dbt_ls_compressed"] == "eJwrzs9NVcgvLSkoLQEAGpAEhg==" + assert mock_variable_set.call_args[0][1]["last_modified"] == "2022-01-01T12:00:00" + version = mock_variable_set.call_args[0][1].get("version") + hash_dir, hash_args = version.split(",") + assert hash_args == "d41d8cd98f00b204e9800998ecf8427e" + if sys.platform == "darwin": + assert hash_dir == "cdc6f0bec00f4edc616f3aa755a34330" + else: + assert hash_dir == "77d08d6da374330ac1b49438ff2873f7" + + +@pytest.mark.integration +def test_get_dbt_ls_cache_returns_empty_if_non_json_var(airflow_variable): + graph = DbtGraph(project=ProjectConfig()) + assert graph.get_dbt_ls_cache() == {} + + +@patch("cosmos.dbt.graph.Variable.get", return_value={"dbt_ls_compressed": "eJwrzs9NVcgvLSkoLQEAGpAEhg=="}) +def test_get_dbt_ls_cache_returns_decoded_and_decompressed_value(mock_variable_get): + graph = DbtGraph(project=ProjectConfig()) + assert graph.get_dbt_ls_cache() == {"dbt_ls": "some output"} + + +@patch("cosmos.dbt.graph.Variable.get", return_value={}) +def test_get_dbt_ls_cache_returns_empty_dict_if_empty_dict_var(mock_variable_get): + graph = DbtGraph(project=ProjectConfig()) + assert graph.get_dbt_ls_cache() == {} + + +@patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls_without_cache") +@patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls_cache", return_value=True) +def test_load_via_dbt_ls_does_not_call_without_cache(mock_cache, mock_without_cache): + graph = DbtGraph(project=ProjectConfig()) + graph.load_via_dbt_ls() + assert mock_cache.called + assert not mock_without_cache.called + + +@patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls_without_cache") +@patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls_cache", return_value=False) +def test_load_via_dbt_ls_calls_without_cache(mock_cache, mock_without_cache): + graph = DbtGraph(project=ProjectConfig()) + graph.load_via_dbt_ls() + assert mock_cache.called + assert mock_without_cache.called + + +@patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=False) +def test_load_via_dbt_ls_cache_is_false_if_disabled(mock_should_use_dbt_ls_cache): + graph = DbtGraph(project=ProjectConfig()) + assert not graph.load_via_dbt_ls_cache() + assert mock_should_use_dbt_ls_cache.called + + +@patch("cosmos.dbt.graph.DbtGraph.get_dbt_ls_cache", return_value={}) +@patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=True) +def test_load_via_dbt_ls_cache_is_false_if_no_cache(mock_should_use_dbt_ls_cache, mock_get_dbt_ls_cache): + graph = DbtGraph(project=ProjectConfig(dbt_project_path="/tmp")) + assert not graph.load_via_dbt_ls_cache() + assert mock_should_use_dbt_ls_cache.called + assert mock_get_dbt_ls_cache.called + + +@patch("cosmos.dbt.graph.cache._calculate_dbt_ls_cache_current_version", return_value=1) +@patch("cosmos.dbt.graph.DbtGraph.get_dbt_ls_cache", return_value={"version": 2, "dbt_ls": "output"}) +@patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=True) +def test_load_via_dbt_ls_cache_is_false_if_cache_is_outdated( + mock_should_use_dbt_ls_cache, mock_get_dbt_ls_cache, mock_calculate_current_version +): + graph = DbtGraph(project=ProjectConfig(dbt_project_path="/tmp")) + assert not graph.load_via_dbt_ls_cache() + assert mock_should_use_dbt_ls_cache.called + assert mock_get_dbt_ls_cache.called + assert mock_calculate_current_version.called + + +@patch("cosmos.dbt.graph.parse_dbt_ls_output", return_value={"some-node": {}}) +@patch("cosmos.dbt.graph.cache._calculate_dbt_ls_cache_current_version", return_value=1) +@patch("cosmos.dbt.graph.DbtGraph.get_dbt_ls_cache", return_value={"version": 1, "dbt_ls": "output"}) +@patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=True) +def test_load_via_dbt_ls_cache_is_true( + mock_should_use_dbt_ls_cache, mock_get_dbt_ls_cache, mock_calculate_current_version, mock_parse_dbt_ls_output +): + graph = DbtGraph(project=ProjectConfig(dbt_project_path="/tmp")) + assert graph.load_via_dbt_ls_cache() + assert graph.load_method == LoadMode.DBT_LS_CACHE + assert graph.nodes == {"some-node": {}} + assert graph.filtered_nodes == {"some-node": {}} + assert mock_should_use_dbt_ls_cache.called + assert mock_get_dbt_ls_cache.called + assert mock_calculate_current_version.called + assert mock_parse_dbt_ls_output.called + + +@pytest.mark.parametrize( + "enable_cache,enable_cache_dbt_ls,cache_id,should_use", + [ + (False, True, "id", False), + (True, False, "id", False), + (False, False, "id", False), + (True, True, "", False), + (True, True, "id", True), + ], +) +def test_should_use_dbt_ls_cache(enable_cache, enable_cache_dbt_ls, cache_id, should_use): + with patch.dict( + os.environ, + { + "AIRFLOW__COSMOS__ENABLE_CACHE": str(enable_cache), + "AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS": str(enable_cache_dbt_ls), + }, + ): + importlib.reload(settings) + graph = DbtGraph(cache_identifier=cache_id, project=ProjectConfig(dbt_project_path="/tmp")) + graph.should_use_dbt_ls_cache.cache_clear() + assert graph.should_use_dbt_ls_cache() == should_use diff --git a/tests/test_cache.py b/tests/test_cache.py index 7d6a2d36c..9cd216998 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -2,12 +2,14 @@ import shutil import tempfile import time -from datetime import datetime +from datetime import datetime, timedelta, timezone from pathlib import Path from unittest.mock import call, patch import pytest from airflow import DAG +from airflow.models import DagRun, Variable +from airflow.utils.db import create_session from airflow.utils.task_group import TaskGroup from cosmos.cache import ( @@ -15,6 +17,7 @@ _create_cache_identifier, _get_latest_partial_parse, _update_partial_parse_cache, + delete_unused_dbt_ls_cache, ) from cosmos.constants import DBT_PARTIAL_PARSE_FILE_NAME, DBT_TARGET_DIR_NAME @@ -112,3 +115,70 @@ def test_update_partial_parse_cache(mock_get_partial_parse_path, mock_copyfile): call(str(latest_partial_parse_filepath.parent / "manifest.json"), str(manifest_path)), ] mock_copyfile.assert_has_calls(calls) + + +@pytest.fixture +def vars_session(): + with create_session() as session: + var1 = Variable(key="cosmos_cache__dag_a", val='{"dag_id": "dag_a"}') + var2 = Variable(key="cosmos_cache__dag_b", val='{"dag_id": "dag_b"}') + var3 = Variable(key="cosmos_cache__dag_c__task_group_1", val='{"dag_id": "dag_c"}') + + dag_run_a = DagRun( + dag_id="dag_a", + run_id="dag_a_run_a_week_ago", + execution_date=datetime.now(timezone.utc) - timedelta(days=7), + state="success", + run_type="manual", + ) + dag_run_b = DagRun( + dag_id="dag_b", + run_id="dag_b_run_yesterday", + execution_date=datetime.now(timezone.utc) - timedelta(days=1), + state="failed", + run_type="manual", + ) + dag_run_c = DagRun( + dag_id="dag_c", + run_id="dag_c_run_on_hour_ago", + execution_date=datetime.now(timezone.utc) - timedelta(hours=1), + state="running", + run_type="manual", + ) + + session.add(var1) + session.add(var2) + session.add(var3) + session.add(dag_run_a) + session.add(dag_run_b) + session.add(dag_run_c) + session.commit() + + yield session + + session.query(Variable).filter_by(key="cosmos_cache__dag_a").delete() + session.query(Variable).filter_by(key="cosmos_cache__dag_b").delete() + session.query(Variable).filter_by(key="cosmos_cache__dag_c__task_group_1").delete() + + session.query(DagRun).filter_by(dag_id="dag_a", run_id="dag_a_run_a_week_ago").delete() + session.query(DagRun).filter_by(dag_id="dag_b", run_id="dag_b_run_yesterday").delete() + session.query(DagRun).filter_by(dag_id="dag_c", run_id="dag_c_run_on_hour_ago").delete() + session.commit() + + +@pytest.mark.integration +def test_delete_unused_dbt_ls_cache_deletes_a_week_ago_cache(vars_session): + assert vars_session.query(Variable).filter_by(key="cosmos_cache__dag_a").first() + assert delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(days=5), session=vars_session) == 1 + assert not vars_session.query(Variable).filter_by(key="cosmos_cache__dag_a").first() + + +@pytest.mark.integration +def test_delete_unused_dbt_ls_cache_deletes_all_cache_five_minutes_ago(vars_session): + assert vars_session.query(Variable).filter_by(key="cosmos_cache__dag_a").first() + assert vars_session.query(Variable).filter_by(key="cosmos_cache__dag_b").first() + assert vars_session.query(Variable).filter_by(key="cosmos_cache__dag_c__task_group_1").first() + assert delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(minutes=5), session=vars_session) == 3 + assert not vars_session.query(Variable).filter_by(key="cosmos_cache__dag_a").first() + assert not vars_session.query(Variable).filter_by(key="cosmos_cache__dag_b").first() + assert not vars_session.query(Variable).filter_by(key="cosmos_cache__dag_c__task_group_1").first()