-
Notifications
You must be signed in to change notification settings - Fork 180
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Speed up
LoadMode.DBT_LS
by caching dbt ls output in Airflow Variab…
…le (#1014) Improve significantly the `LoadMode.DBT_LS` performance. The example DAGs tested reduced the task queueing time significantly (from ~30s to ~0.5s) and the total DAG run time for Jaffle Shop from 1 min 25s to 40s (by more than 50%). Some users[ reported improvements of 84%](#1014 (comment)) in the DAG run time when trying out these changes. This difference can be even more significant on larger dbt projects. The improvement was accomplished by caching the dbt ls output as an Airflow Variable. This is an alternative to #992, when we cached the pickled DAG/TaskGroup into a local file in the Airflow node. Unlike #992, this approach works well for distributed deployments of Airflow. As with any caching solution, this strategy does not guarantee optimal performance on every run—whenever the cache is regenerated, the scheduler or DAG processor will experience a delay. It was also observed that the key value could change across platforms (e.g., `Darwin` and `Linux`). Therefore, if using a deployment with heterogeneous OS, the key may be regenerated often. Closes: #990 Closes: #1061 **Enabling/disabling this feature** This feature is enabled by default. Users can disable it by setting the environment variable `AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS=0`. **How the cache is refreshed** Users can purge or delete the cache via Airflow UI by identifying and deleting the cache key. The cache will be automatically refreshed in case any files of the dbt project change. Changes are calculated using the SHA256 of all the files in the directory. Initially, this feature was implemented using the files' modified timestamp, but this did not work well for some Airflow deployments (e.g., `astro --dags` since the timestamp was changed during deployments). Additionally, if any of the following DAG configurations are changed, we'll automatically purge the cache of the DAGs that use that specific configuration: * `ProjectConfig.dbt_vars` * `ProjectConfig.env_vars` * `ProjectConfig.partial_parse` * `RenderConfig.env_vars` * `RenderConfig.exclude` * `RenderConfig.select` * `RenderConfig.selector` The following argument was introduced in case users would like to define Airflow variables that could be used to refresh the cache (it expects a list with Airflow variable names): * `RenderConfig.airflow_vars_to_purge_cache` Example: ``` RenderConfig( airflow_vars_to_purge_cache==["refresh_cache"] ) ``` **Cache key** The Airflow variables that represent the dbt ls cache are prefixed by `cosmos_cache`. When using `DbtDag`, the keys use the DAG name. When using `DbtTaskGroup`, they consider the TaskGroup and parent task groups and DAG. Examples: 1. The `DbtDag` "cosmos_dag" will have the cache represented by `"cosmos_cache__basic_cosmos_dag"`. 2. The `DbtTaskGroup` "customers" declared inside teh DAG "basic_cosmos_task_group" will have the cache key `"cosmos_cache__basic_cosmos_task_group__customers"`. **Cache value** The cache values contain a few properties: - `last_modified` timestamp, represented using the ISO 8601 format. - `version` is a hash that represents the version of the dbt project and arguments used to run dbt ls by the time the cache was created - `dbt_ls_compressed` represents the dbt ls output compressed using zlib and encoded to base64 to be recorded as a string to the Airflow metadata database. Steps used to compress: ``` compressed_data = zlib.compress(dbt_ls_output.encode("utf-8")) encoded_data = base64.b64encode(compressed_data) dbt_ls_compressed = encoded_data.decode("utf-8") ``` We are compressing this value because it will be significant for larger dbt projects, depending on the selectors used, and we wanted this approach to be safe and not clutter the Airflow metadata database. Some numbers on the compression * A dbt project with 100 models can lead to a dbt ls output of 257k characters when using JSON. Zlib could compress it by 20x. * Another [real-life dbt project](https://gitlab.com/gitlab-data/analytics/-/tree/master/transform/snowflake-dbt?ref_type=heads) with 9,285 models led to a dbt ls output of 8.4 MB, uncompressed. It reduces to 489 KB after being compressed using `zlib` and encoded using `base64` - to 6% of the original size. * Maximum cell size in Postgres: 20MB The latency used to compress is in the order of milliseconds, not interfering in the performance of this solution. **Future work** * How this will affect the Airflow db in the long term * How does this performance compare to `ObjectStorage`? **Example of results before and after this change** Task queue times in Astro before the change: <img width="1488" alt="Screenshot 2024-06-03 at 11 15 26" src="https://github.com/astronomer/astronomer-cosmos/assets/272048/20f6ae8f-02e0-4974-b445-740925ab1b3c"> Task queue times in Astro after the change on the second run of the DAG: <img width="1624" alt="Screenshot 2024-06-03 at 11 15 44" src="https://github.com/astronomer/astronomer-cosmos/assets/272048/c7b8a821-8751-4d2c-8feb-1d0c9bbba97e"> This feature will be available in `astronomer-cosmos==1.5.0a8`.
- Loading branch information
Showing
22 changed files
with
994 additions
and
63 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -116,6 +116,7 @@ jobs: | |
- uses: actions/checkout@v3 | ||
with: | ||
ref: ${{ github.event.pull_request.head.sha || github.ref }} | ||
|
||
- uses: actions/cache@v3 | ||
with: | ||
path: | | ||
|
@@ -139,6 +140,7 @@ jobs: | |
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-setup | ||
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration | ||
env: | ||
AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0 | ||
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ | ||
AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:[email protected]:5432/postgres | ||
DATABRICKS_HOST: mock | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.