From 111d430b0da0c4794df9a98eefc97207036f9af6 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Thu, 3 Oct 2024 21:43:18 +0530 Subject: [PATCH] Experimental BQ support to run dbt models with `ExecutionMode.AIRFLOW_ASYNC` (#1230) Enable BQ users to run dbt models (`full_refresh`) asynchronously. This releases the Airflow worker node from waiting while the transformation (I/O) happens in the dataware house, increasing the overall Airflow task throughput (more information: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html). As part of this change, we introduce the capability of not using the dbt command to run actual SQL transformations. This also avoids creating subprocesses in the worker node (`ExecutionMode.LOCAL` with `InvocationMode. SUBPROCESS` and `ExecutionMode.VIRTUALENV`) or the overhead of creating a Kubernetes Pod to execute the actual dbt command (`ExecutionMode.KUBERNETES`). This can avoid issues related to memory and CPU usage. This PR takes advantage of an already implemented async operator in the Airflow repo by extending it in the Cosmos async operator. It also utilizes the pre-compiled SQL generated as part of the PR https://github.com/astronomer/astronomer-cosmos/pull/1224. It downloads the generated SQL from a remote location (S3/GCS), which allows us to decouple from dbt during task execution. ## Details - Expose `get_profile_type` on ProfileConfig: This aids in database selection - ~Add `async_op_args`: A high-level parameter to forward arguments to the upstream operator (Airflow operator). (This may change in this PR itself)~ The async operator params are process as kwargs in the operator_args parameter - Implement `DbtRunAirflowAsyncOperator`: This initializes the Airflow Operator, retrieves the SQL query at task runtime from a remote location, modifies the query as needed, and triggers the upstream execute method. ## Limitations - This feature only works when using Airflow 2.8 and above - The async execution only works for BigQuery - The async execution only supports running dbt models (other dbt resources, such as seeds, sources, snapshots, tests, are run using the `ExecutionMode.LOCAL`) - This will work only if the user provides sets `full_refresh=True` in `operator_args` (which means tables will be dropped before being populated, as implemented in `dbt-core`) - Users need to use `ProfileMapping` in `ProfileConfig`, since Cosmos relies on having the connection (credentials) to be able to run the transformation in BQ without `dbt-core` - Users must provide the BQ `location` in `operator_args` (this is a limitation from the `BigQueryInsertJobOperator` that is being used to implement the native Airflow asynchronous support) ## Testing We have added a new dbt project to the repository to facilitate asynchronous task execution. The goal is to accelerate development without disrupting or requiring fixes for the existing tests. Also, we have added DAG for end-to-end testing https://github.com/astronomer/astronomer-cosmos/blob/bd6657a29b111510fc34b2baf0bcc0d65ec0e5b9/dev/dags/simple_dag_async.py ## Configuration Users need to configure the below param to execute deferrable tasks in the Cosmos - [ExecutionMode: AIRFLOW_ASYNC](https://astronomer.github.io/astronomer-cosmos/getting_started/execution-modes.html) - [remote_target_path](https://astronomer.github.io/astronomer-cosmos/configuration/cosmos-conf.html#remote-target-path) - [remote_target_path_conn_id](https://astronomer.github.io/astronomer-cosmos/configuration/cosmos-conf.html#remote-target-path-conn-id) Example DAG: https://github.com/astronomer/astronomer-cosmos/blob/bd6657a29b111510fc34b2baf0bcc0d65ec0e5b9/dev/dags/simple_dag_async.py ## Installation You can leverage async operator support by installing an additional dependency ``` astronomer-cosmos[dbt-bigquery, google] ``` ## Documentation The PR also document the limitations and uses of Airflow async execution in the Cosmos. ## Related Issue(s) Related to: https://github.com/astronomer/astronomer-cosmos/issues/1120 Closes: #1134 ## Breaking Change? No ## Notes This is an experimental feature, and as such, it may undergo breaking changes. We encourage users to share their experiences and feedback to improve it further. We'd love support and feedback so we can define the next steps. ## Checklist - [x] I have made corresponding changes to the documentation (if required) - [x] I have added tests that prove my fix is effective or that my feature works ## Credits This was a result of teamwork and effort: Co-authored-by: Pankaj Koti Co-authored-by: Tatiana Al-Chueyr ## Future Work - Design interface to facilitate the easy addition of new asynchronous databases operators https://github.com/astronomer/astronomer-cosmos/issues/1238 - Improve the test coverage https://github.com/astronomer/astronomer-cosmos/issues/1239 - Address the limitations (we need to log these issues) --------- Co-authored-by: Pankaj Koti Co-authored-by: Tatiana Al-Chueyr --- .gitignore | 3 - cosmos/__init__.py | 3 +- cosmos/airflow/graph.py | 33 ++- cosmos/config.py | 16 ++ cosmos/core/airflow.py | 3 +- cosmos/dbt/graph.py | 1 - cosmos/operators/airflow_async.py | 179 +++++++++++++--- cosmos/operators/local.py | 24 +-- cosmos/settings.py | 1 - dev/Dockerfile | 2 +- dev/dags/dbt/original_jaffle_shop/.gitignore | 5 + dev/dags/dbt/original_jaffle_shop/LICENSE | 201 ++++++++++++++++++ dev/dags/dbt/original_jaffle_shop/README.md | 11 + .../dbt/original_jaffle_shop/dbt_project.yml | 26 +++ .../macros/drop_table.sql | 6 + .../original_jaffle_shop/models/customers.sql | 69 ++++++ .../dbt/original_jaffle_shop/models/docs.md | 14 ++ .../original_jaffle_shop/models/orders.sql | 56 +++++ .../original_jaffle_shop/models/overview.md | 11 + .../original_jaffle_shop/models/schema.yml | 82 +++++++ .../models/staging/schema.yml | 31 +++ .../models/staging/stg_customers.sql | 22 ++ .../models/staging/stg_orders.sql | 23 ++ .../models/staging/stg_payments.sql | 25 +++ .../seeds/raw_customers.csv | 101 +++++++++ .../original_jaffle_shop/seeds/raw_orders.csv | 100 +++++++++ .../seeds/raw_payments.csv | 114 ++++++++++ dev/dags/simple_dag_async.py | 19 +- docs/getting_started/execution-modes.rst | 47 +++- scripts/test/integration-dbt-1-5-4.sh | 2 +- scripts/test/integration-setup.sh | 2 +- tests/airflow/test_graph.py | 55 +++-- tests/dbt/test_graph.py | 2 +- tests/operators/test_airflow_async.py | 36 +--- tests/operators/test_local.py | 3 +- tests/test_example_dags.py | 12 +- tests/test_example_dags_no_connections.py | 4 + 37 files changed, 1226 insertions(+), 118 deletions(-) create mode 100644 dev/dags/dbt/original_jaffle_shop/.gitignore create mode 100644 dev/dags/dbt/original_jaffle_shop/LICENSE create mode 100644 dev/dags/dbt/original_jaffle_shop/README.md create mode 100644 dev/dags/dbt/original_jaffle_shop/dbt_project.yml create mode 100644 dev/dags/dbt/original_jaffle_shop/macros/drop_table.sql create mode 100644 dev/dags/dbt/original_jaffle_shop/models/customers.sql create mode 100644 dev/dags/dbt/original_jaffle_shop/models/docs.md create mode 100644 dev/dags/dbt/original_jaffle_shop/models/orders.sql create mode 100644 dev/dags/dbt/original_jaffle_shop/models/overview.md create mode 100644 dev/dags/dbt/original_jaffle_shop/models/schema.yml create mode 100644 dev/dags/dbt/original_jaffle_shop/models/staging/schema.yml create mode 100644 dev/dags/dbt/original_jaffle_shop/models/staging/stg_customers.sql create mode 100644 dev/dags/dbt/original_jaffle_shop/models/staging/stg_orders.sql create mode 100644 dev/dags/dbt/original_jaffle_shop/models/staging/stg_payments.sql create mode 100644 dev/dags/dbt/original_jaffle_shop/seeds/raw_customers.csv create mode 100644 dev/dags/dbt/original_jaffle_shop/seeds/raw_orders.csv create mode 100644 dev/dags/dbt/original_jaffle_shop/seeds/raw_payments.csv diff --git a/.gitignore b/.gitignore index 6f6111510..5991c231c 100644 --- a/.gitignore +++ b/.gitignore @@ -160,6 +160,3 @@ webserver_config.py # VI *.sw[a-z] - -# Ignore possibly created symlink to `dev/dags` for running `airflow dags test` command. -dags diff --git a/cosmos/__init__.py b/cosmos/__init__.py index f76f18b7c..ad4c9ee35 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -5,7 +5,8 @@ Contains dags, task groups, and operators. """ -__version__ = "1.6.0" + +__version__ = "1.7.0a1" from cosmos.airflow.dag import DbtDag diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 9de21292e..f507b03ac 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -132,6 +132,7 @@ def create_task_metadata( node: DbtNode, execution_mode: ExecutionMode, args: dict[str, Any], + dbt_dag_task_group_identifier: str, use_task_group: bool = False, source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE, ) -> TaskMetadata | None: @@ -142,6 +143,7 @@ def create_task_metadata( :param execution_mode: Where Cosmos should run each dbt task (e.g. ExecutionMode.LOCAL, ExecutionMode.KUBERNETES). Default is ExecutionMode.LOCAL. :param args: Arguments to be used to instantiate an Airflow Task + :param dbt_dag_task_group_identifier: Identifier to refer to the DbtDAG or DbtTaskGroup in the DAG. :param use_task_group: It determines whether to use the name as a prefix for the task id or not. If it is False, then use the name as a prefix for the task id, otherwise do not. :returns: The metadata necessary to instantiate the source dbt node as an Airflow task. @@ -156,7 +158,10 @@ def create_task_metadata( args = {**args, **{"models": node.resource_name}} if DbtResourceType(node.resource_type) in DEFAULT_DBT_RESOURCES and node.resource_type in dbt_resource_to_class: - extra_context = {"dbt_node_config": node.context_dict} + extra_context = { + "dbt_node_config": node.context_dict, + "dbt_dag_task_group_identifier": dbt_dag_task_group_identifier, + } if node.resource_type == DbtResourceType.MODEL: task_id = f"{node.name}_run" if use_task_group is True: @@ -226,6 +231,7 @@ def generate_task_or_group( node=node, execution_mode=execution_mode, args=task_args, + dbt_dag_task_group_identifier=_get_dbt_dag_task_group_identifier(dag, task_group), use_task_group=use_task_group, source_rendering_behavior=source_rendering_behavior, ) @@ -268,14 +274,28 @@ def _add_dbt_compile_task( id=DBT_COMPILE_TASK_ID, operator_class="cosmos.operators.airflow_async.DbtCompileAirflowAsyncOperator", arguments=task_args, - extra_context={}, + extra_context={"dbt_dag_task_group_identifier": _get_dbt_dag_task_group_identifier(dag, task_group)}, ) compile_airflow_task = create_airflow_task(compile_task_metadata, dag, task_group=task_group) + + for task_id, task in tasks_map.items(): + if not task.upstream_list: + compile_airflow_task >> task + tasks_map[DBT_COMPILE_TASK_ID] = compile_airflow_task - for node_id, node in nodes.items(): - if not node.depends_on and node_id in tasks_map: - tasks_map[DBT_COMPILE_TASK_ID] >> tasks_map[node_id] + +def _get_dbt_dag_task_group_identifier(dag: DAG, task_group: TaskGroup | None) -> str: + dag_id = dag.dag_id + task_group_id = task_group.group_id if task_group else None + identifiers_list = [] + if dag_id: + identifiers_list.append(dag_id) + if task_group_id: + identifiers_list.append(task_group_id) + dag_task_group_identifier = "__".join(identifiers_list) + + return dag_task_group_identifier def build_airflow_graph( @@ -358,9 +378,8 @@ def build_airflow_graph( for leaf_node_id in leaves_ids: tasks_map[leaf_node_id] >> test_task - _add_dbt_compile_task(nodes, dag, execution_mode, task_args, tasks_map, task_group) - create_airflow_task_dependencies(nodes, tasks_map) + _add_dbt_compile_task(nodes, dag, execution_mode, task_args, tasks_map, task_group) def create_airflow_task_dependencies( diff --git a/cosmos/config.py b/cosmos/config.py index 2cebbf3cc..ccda2c432 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -10,6 +10,7 @@ from pathlib import Path from typing import Any, Callable, Iterator +import yaml from airflow.version import version as airflow_version from cosmos.cache import create_cache_profile, get_cached_profile, is_profile_cache_enabled @@ -286,6 +287,21 @@ def validate_profiles_yml(self) -> None: if self.profiles_yml_filepath and not Path(self.profiles_yml_filepath).exists(): raise CosmosValueError(f"The file {self.profiles_yml_filepath} does not exist.") + def get_profile_type(self) -> str: + if isinstance(self.profile_mapping, BaseProfileMapping): + return str(self.profile_mapping.dbt_profile_type) + + profile_path = self._get_profile_path() + + with open(profile_path) as file: + profiles = yaml.safe_load(file) + + profile = profiles[self.profile_name] + target_type = profile["outputs"][self.target_name]["type"] + return str(target_type) + + return "undefined" + def _get_profile_path(self, use_mock_values: bool = False) -> Path: """ Handle the profile caching mechanism. diff --git a/cosmos/core/airflow.py b/cosmos/core/airflow.py index 1bdce9361..6f1064649 100644 --- a/cosmos/core/airflow.py +++ b/cosmos/core/airflow.py @@ -1,6 +1,7 @@ from __future__ import annotations import importlib +from typing import Any from airflow.models import BaseOperator from airflow.models.dag import DAG @@ -27,7 +28,7 @@ def get_airflow_task(task: Task, dag: DAG, task_group: TaskGroup | None = None) module = importlib.import_module(module_name) Operator = getattr(module, class_name) - task_kwargs = {} + task_kwargs: dict[str, Any] = {} if task.owner != "": task_kwargs["owner"] = task.owner diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 1c0237e8f..7a957b2fc 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -467,7 +467,6 @@ def should_use_dbt_ls_cache(self) -> bool: def load_via_dbt_ls_cache(self) -> bool: """(Try to) load dbt ls cache from an Airflow Variable""" - logger.info(f"Trying to parse the dbt project using dbt ls cache {self.dbt_ls_cache_key}...") if self.should_use_dbt_ls_cache(): project_path = self.project_path diff --git a/cosmos/operators/airflow_async.py b/cosmos/operators/airflow_async.py index 05f762702..a7f30a330 100644 --- a/cosmos/operators/airflow_async.py +++ b/cosmos/operators/airflow_async.py @@ -1,67 +1,190 @@ +from __future__ import annotations + +import inspect +from pathlib import Path +from typing import TYPE_CHECKING, Any, Sequence + +from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook +from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator +from airflow.utils.context import Context + +from cosmos import settings +from cosmos.config import ProfileConfig +from cosmos.exceptions import CosmosValueError +from cosmos.operators.base import AbstractDbtBaseOperator from cosmos.operators.local import ( DbtBuildLocalOperator, DbtCompileLocalOperator, - DbtDocsAzureStorageLocalOperator, - DbtDocsGCSLocalOperator, - DbtDocsLocalOperator, - DbtDocsS3LocalOperator, + DbtLocalBaseOperator, DbtLSLocalOperator, - DbtRunLocalOperator, DbtRunOperationLocalOperator, DbtSeedLocalOperator, DbtSnapshotLocalOperator, DbtSourceLocalOperator, DbtTestLocalOperator, ) +from cosmos.settings import remote_target_path, remote_target_path_conn_id +_SUPPORTED_DATABASES = ["bigquery"] -class DbtBuildAirflowAsyncOperator(DbtBuildLocalOperator): - pass +from abc import ABCMeta - -class DbtLSAirflowAsyncOperator(DbtLSLocalOperator): - pass +from airflow.models.baseoperator import BaseOperator -class DbtSeedAirflowAsyncOperator(DbtSeedLocalOperator): - pass - - -class DbtSnapshotAirflowAsyncOperator(DbtSnapshotLocalOperator): - pass - - -class DbtSourceAirflowAsyncOperator(DbtSourceLocalOperator): - pass +class DbtBaseAirflowAsyncOperator(BaseOperator, metaclass=ABCMeta): + def __init__(self, **kwargs) -> None: # type: ignore + self.location = kwargs.pop("location") + self.configuration = kwargs.pop("configuration", {}) + super().__init__(**kwargs) -class DbtRunAirflowAsyncOperator(DbtRunLocalOperator): +class DbtBuildAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtBuildLocalOperator): # type: ignore pass -class DbtTestAirflowAsyncOperator(DbtTestLocalOperator): +class DbtLSAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtLSLocalOperator): # type: ignore pass -class DbtRunOperationAirflowAsyncOperator(DbtRunOperationLocalOperator): +class DbtSeedAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtSeedLocalOperator): # type: ignore pass -class DbtDocsAirflowAsyncOperator(DbtDocsLocalOperator): +class DbtSnapshotAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtSnapshotLocalOperator): # type: ignore pass -class DbtDocsS3AirflowAsyncOperator(DbtDocsS3LocalOperator): +class DbtSourceAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtSourceLocalOperator): # type: ignore pass -class DbtDocsAzureStorageAirflowAsyncOperator(DbtDocsAzureStorageLocalOperator): +class DbtRunAirflowAsyncOperator(BigQueryInsertJobOperator): # type: ignore + + template_fields: Sequence[str] = ( + "full_refresh", + "project_dir", + "gcp_project", + "dataset", + "location", + ) + + def __init__( # type: ignore + self, + project_dir: str, + profile_config: ProfileConfig, + location: str, # This is a mandatory parameter when using BigQueryInsertJobOperator with deferrable=True + full_refresh: bool = False, + extra_context: dict[str, object] | None = None, + configuration: dict[str, object] | None = None, + **kwargs, + ) -> None: + # dbt task param + self.project_dir = project_dir + self.extra_context = extra_context or {} + self.full_refresh = full_refresh + self.profile_config = profile_config + if not self.profile_config or not self.profile_config.profile_mapping: + raise CosmosValueError(f"Cosmos async support is only available when using ProfileMapping") + + self.profile_type: str = profile_config.get_profile_type() # type: ignore + if self.profile_type not in _SUPPORTED_DATABASES: + raise CosmosValueError(f"Async run are only supported: {_SUPPORTED_DATABASES}") + + # airflow task param + self.location = location + self.configuration = configuration or {} + self.gcp_conn_id = self.profile_config.profile_mapping.conn_id # type: ignore + profile = self.profile_config.profile_mapping.profile + self.gcp_project = profile["project"] + self.dataset = profile["dataset"] + + # Cosmos attempts to pass many kwargs that BigQueryInsertJobOperator simply does not accept. + # We need to pop them. + clean_kwargs = {} + non_async_args = set(inspect.signature(AbstractDbtBaseOperator.__init__).parameters.keys()) + non_async_args |= set(inspect.signature(DbtLocalBaseOperator.__init__).parameters.keys()) + non_async_args -= {"task_id"} + + for arg_key, arg_value in kwargs.items(): + if arg_key not in non_async_args: + clean_kwargs[arg_key] = arg_value + + # The following are the minimum required parameters to run BigQueryInsertJobOperator using the deferrable mode + super().__init__( + gcp_conn_id=self.gcp_conn_id, + configuration=self.configuration, + location=self.location, + deferrable=True, + **clean_kwargs, + ) + + def get_remote_sql(self) -> str: + if not settings.AIRFLOW_IO_AVAILABLE: + raise CosmosValueError(f"Cosmos async support is only available starting in Airflow 2.8 or later.") + from airflow.io.path import ObjectStoragePath + + file_path = self.extra_context["dbt_node_config"]["file_path"] # type: ignore + dbt_dag_task_group_identifier = self.extra_context["dbt_dag_task_group_identifier"] + + remote_target_path_str = str(remote_target_path).rstrip("/") + + if TYPE_CHECKING: + assert self.project_dir is not None + + project_dir_parent = str(Path(self.project_dir).parent) + relative_file_path = str(file_path).replace(project_dir_parent, "").lstrip("/") + remote_model_path = f"{remote_target_path_str}/{dbt_dag_task_group_identifier}/compiled/{relative_file_path}" + + object_storage_path = ObjectStoragePath(remote_model_path, conn_id=remote_target_path_conn_id) + with object_storage_path.open() as fp: # type: ignore + return fp.read() # type: ignore + + def drop_table_sql(self) -> None: + model_name = self.extra_context["dbt_node_config"]["resource_name"] # type: ignore + sql = f"DROP TABLE IF EXISTS {self.gcp_project}.{self.dataset}.{model_name};" + + hook = BigQueryHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + self.configuration = { + "query": { + "query": sql, + "useLegacySql": False, + } + } + hook.insert_job(configuration=self.configuration, location=self.location, project_id=self.gcp_project) + + def execute(self, context: Context) -> Any | None: + if not self.full_refresh: + raise CosmosValueError("The async execution only supported for full_refresh") + else: + # It may be surprising to some, but the dbt-core --full-refresh argument fully drops the table before populating it + # https://github.com/dbt-labs/dbt-core/blob/5e9f1b515f37dfe6cdae1ab1aa7d190b92490e24/core/dbt/context/base.py#L662-L666 + # https://docs.getdbt.com/reference/resource-configs/full_refresh#recommendation + # We're emulating this behaviour here + self.drop_table_sql() + sql = self.get_remote_sql() + model_name = self.extra_context["dbt_node_config"]["resource_name"] # type: ignore + # prefix explicit create command to create table + sql = f"CREATE TABLE {self.gcp_project}.{self.dataset}.{model_name} AS {sql}" + self.configuration = { + "query": { + "query": sql, + "useLegacySql": False, + } + } + return super().execute(context) + + +class DbtTestAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtTestLocalOperator): # type: ignore pass -class DbtDocsGCSAirflowAsyncOperator(DbtDocsGCSLocalOperator): +class DbtRunOperationAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtRunOperationLocalOperator): # type: ignore pass -class DbtCompileAirflowAsyncOperator(DbtCompileLocalOperator): +class DbtCompileAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtCompileLocalOperator): # type: ignore pass diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index db5993609..05fa356f6 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -32,7 +32,7 @@ from cosmos.dataset import get_dataset_alias_name from cosmos.dbt.project import get_partial_parse_path, has_non_empty_dependencies_file from cosmos.exceptions import AirflowCompatibilityError, CosmosValueError -from cosmos.settings import AIRFLOW_IO_AVAILABLE, remote_target_path, remote_target_path_conn_id +from cosmos.settings import remote_target_path, remote_target_path_conn_id try: from airflow.datasets import Dataset @@ -294,7 +294,7 @@ def _configure_remote_target_path() -> tuple[Path, str] | tuple[None, None]: if remote_conn_id is None: return None, None - if not AIRFLOW_IO_AVAILABLE: + if not settings.AIRFLOW_IO_AVAILABLE: raise CosmosValueError( f"You're trying to specify remote target path {target_path_str}, but the required " f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " @@ -311,23 +311,16 @@ def _configure_remote_target_path() -> tuple[Path, str] | tuple[None, None]: return _configured_target_path, remote_conn_id def _construct_dest_file_path( - self, dest_target_dir: Path, file_path: str, source_compiled_dir: Path, context: Context + self, + dest_target_dir: Path, + file_path: str, + source_compiled_dir: Path, ) -> str: """ Construct the destination path for the compiled SQL files to be uploaded to the remote store. """ dest_target_dir_str = str(dest_target_dir).rstrip("/") - - task = context["task"] - dag_id = task.dag_id - task_group_id = task.task_group.group_id if task.task_group else None - identifiers_list = [] - if dag_id: - identifiers_list.append(dag_id) - if task_group_id: - identifiers_list.append(task_group_id) - dag_task_group_identifier = "__".join(identifiers_list) - + dag_task_group_identifier = self.extra_context["dbt_dag_task_group_identifier"] rel_path = os.path.relpath(file_path, source_compiled_dir).lstrip("/") return f"{dest_target_dir_str}/{dag_task_group_identifier}/compiled/{rel_path}" @@ -340,6 +333,7 @@ def upload_compiled_sql(self, tmp_project_dir: str, context: Context) -> None: return dest_target_dir, dest_conn_id = self._configure_remote_target_path() + if not dest_target_dir: raise CosmosValueError( "You're trying to upload compiled SQL files, but the remote target path is not configured. " @@ -350,7 +344,7 @@ def upload_compiled_sql(self, tmp_project_dir: str, context: Context) -> None: source_compiled_dir = Path(tmp_project_dir) / "target" / "compiled" files = [str(file) for file in source_compiled_dir.rglob("*") if file.is_file()] for file_path in files: - dest_file_path = self._construct_dest_file_path(dest_target_dir, file_path, source_compiled_dir, context) + dest_file_path = self._construct_dest_file_path(dest_target_dir, file_path, source_compiled_dir) dest_object_storage_path = ObjectStoragePath(dest_file_path, conn_id=dest_conn_id) ObjectStoragePath(file_path).copy(dest_object_storage_path) self.log.debug("Copied %s to %s", file_path, dest_object_storage_path) diff --git a/cosmos/settings.py b/cosmos/settings.py index 2cae79968..7bcf04bb9 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -34,7 +34,6 @@ # This will be merged with the `cache_dir` config parameter in upcoming releases. remote_cache_dir = conf.get("cosmos", "remote_cache_dir", fallback=None) remote_cache_dir_conn_id = conf.get("cosmos", "remote_cache_dir_conn_id", fallback=None) - remote_target_path = conf.get("cosmos", "remote_target_path", fallback=None) remote_target_path_conn_id = conf.get("cosmos", "remote_target_path_conn_id", fallback=None) diff --git a/dev/Dockerfile b/dev/Dockerfile index a17bb9943..9fd3df75a 100644 --- a/dev/Dockerfile +++ b/dev/Dockerfile @@ -17,7 +17,7 @@ COPY ./README.rst ${AIRFLOW_HOME}/astronomer_cosmos/ COPY ./cosmos/ ${AIRFLOW_HOME}/astronomer_cosmos/cosmos/ COPY ./dev/requirements.txt ${AIRFLOW_HOME}/requirements.txt # install the package in editable mode -RUN uv pip install --system -e "${AIRFLOW_HOME}/astronomer_cosmos"[dbt-postgres,dbt-databricks] && \ +RUN uv pip install --system -e "${AIRFLOW_HOME}/astronomer_cosmos"[dbt-postgres,dbt-databricks,dbt-bigquery] && \ uv pip install --system -r ${AIRFLOW_HOME}/requirements.txt diff --git a/dev/dags/dbt/original_jaffle_shop/.gitignore b/dev/dags/dbt/original_jaffle_shop/.gitignore new file mode 100644 index 000000000..45d294b9a --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/.gitignore @@ -0,0 +1,5 @@ + +target/ +dbt_packages/ +logs/ +!target/manifest.json diff --git a/dev/dags/dbt/original_jaffle_shop/LICENSE b/dev/dags/dbt/original_jaffle_shop/LICENSE new file mode 100644 index 000000000..8dada3eda --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/dev/dags/dbt/original_jaffle_shop/README.md b/dev/dags/dbt/original_jaffle_shop/README.md new file mode 100644 index 000000000..d4ce46446 --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/README.md @@ -0,0 +1,11 @@ +## `jaffle_shop` + +`jaffle_shop` is a fictional ecommerce store. This dbt project transforms raw data from an app database into a customers and orders model ready for analytics. + +See [dbt's documentation](https://github.com/dbt-labs/jaffle_shop) for more info. + +### Modifications + +This project has been modified from the original to highlight some of the features of Cosmos. Namely: + +- tags have been added to the models diff --git a/dev/dags/dbt/original_jaffle_shop/dbt_project.yml b/dev/dags/dbt/original_jaffle_shop/dbt_project.yml new file mode 100644 index 000000000..42767c5ea --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/dbt_project.yml @@ -0,0 +1,26 @@ +name: 'original_jaffle_shop' + +config-version: 2 +version: '0.1' + +profile: 'jaffle_shop' + +model-paths: ["models"] +seed-paths: ["seeds"] +test-paths: ["tests"] +analysis-paths: ["analysis"] +macro-paths: ["macros"] + +target-path: "target" +clean-targets: + - "target" + - "dbt_modules" + - "logs" + +require-dbt-version: [">=1.0.0", "<2.0.0"] + +models: + jaffle_shop: + materialized: table + staging: + materialized: view diff --git a/dev/dags/dbt/original_jaffle_shop/macros/drop_table.sql b/dev/dags/dbt/original_jaffle_shop/macros/drop_table.sql new file mode 100644 index 000000000..37a8b21d7 --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/macros/drop_table.sql @@ -0,0 +1,6 @@ +{%- macro drop_table_by_name(table_name) -%} + {%- set drop_query -%} + DROP TABLE IF EXISTS {{ target.schema }}.{{ table_name }} CASCADE + {%- endset -%} + {% do run_query(drop_query) %} +{%- endmacro -%} diff --git a/dev/dags/dbt/original_jaffle_shop/models/customers.sql b/dev/dags/dbt/original_jaffle_shop/models/customers.sql new file mode 100644 index 000000000..016a004fe --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/models/customers.sql @@ -0,0 +1,69 @@ +with customers as ( + + select * from {{ ref('stg_customers') }} + +), + +orders as ( + + select * from {{ ref('stg_orders') }} + +), + +payments as ( + + select * from {{ ref('stg_payments') }} + +), + +customer_orders as ( + + select + customer_id, + + min(order_date) as first_order, + max(order_date) as most_recent_order, + count(order_id) as number_of_orders + from orders + + group by customer_id + +), + +customer_payments as ( + + select + orders.customer_id, + sum(amount) as total_amount + + from payments + + left join orders on + payments.order_id = orders.order_id + + group by orders.customer_id + +), + +final as ( + + select + customers.customer_id, + customers.first_name, + customers.last_name, + customer_orders.first_order, + customer_orders.most_recent_order, + customer_orders.number_of_orders, + customer_payments.total_amount as customer_lifetime_value + + from customers + + left join customer_orders + on customers.customer_id = customer_orders.customer_id + + left join customer_payments + on customers.customer_id = customer_payments.customer_id + +) + +select * from final diff --git a/dev/dags/dbt/original_jaffle_shop/models/docs.md b/dev/dags/dbt/original_jaffle_shop/models/docs.md new file mode 100644 index 000000000..c6ae93be0 --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/models/docs.md @@ -0,0 +1,14 @@ +{% docs orders_status %} + +Orders can be one of the following statuses: + +| status | description | +|----------------|------------------------------------------------------------------------------------------------------------------------| +| placed | The order has been placed but has not yet left the warehouse | +| shipped | The order has ben shipped to the customer and is currently in transit | +| completed | The order has been received by the customer | +| return_pending | The customer has indicated that they would like to return the order, but it has not yet been received at the warehouse | +| returned | The order has been returned by the customer and received at the warehouse | + + +{% enddocs %} diff --git a/dev/dags/dbt/original_jaffle_shop/models/orders.sql b/dev/dags/dbt/original_jaffle_shop/models/orders.sql new file mode 100644 index 000000000..cbb293491 --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/models/orders.sql @@ -0,0 +1,56 @@ +{% set payment_methods = ['credit_card', 'coupon', 'bank_transfer', 'gift_card'] %} + +with orders as ( + + select * from {{ ref('stg_orders') }} + +), + +payments as ( + + select * from {{ ref('stg_payments') }} + +), + +order_payments as ( + + select + order_id, + + {% for payment_method in payment_methods -%} + sum(case when payment_method = '{{ payment_method }}' then amount else 0 end) as {{ payment_method }}_amount, + {% endfor -%} + + sum(amount) as total_amount + + from payments + + group by order_id + +), + +final as ( + + select + orders.order_id, + orders.customer_id, + orders.order_date, + orders.status, + + {% for payment_method in payment_methods -%} + + order_payments.{{ payment_method }}_amount, + + {% endfor -%} + + order_payments.total_amount as amount + + from orders + + + left join order_payments + on orders.order_id = order_payments.order_id + +) + +select * from final diff --git a/dev/dags/dbt/original_jaffle_shop/models/overview.md b/dev/dags/dbt/original_jaffle_shop/models/overview.md new file mode 100644 index 000000000..0544c42b1 --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/models/overview.md @@ -0,0 +1,11 @@ +{% docs __overview__ %} + +## Data Documentation for Jaffle Shop + +`jaffle_shop` is a fictional ecommerce store. + +This [dbt](https://www.getdbt.com/) project is for testing out code. + +The source code can be found [here](https://github.com/clrcrl/jaffle_shop). + +{% enddocs %} diff --git a/dev/dags/dbt/original_jaffle_shop/models/schema.yml b/dev/dags/dbt/original_jaffle_shop/models/schema.yml new file mode 100644 index 000000000..381349cfd --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/models/schema.yml @@ -0,0 +1,82 @@ +version: 2 + +models: + - name: customers + description: This table has basic information about a customer, as well as some derived facts based on a customer's orders + + columns: + - name: customer_id + description: This is a unique identifier for a customer + tests: + - unique + - not_null + + - name: first_name + description: Customer's first name. PII. + + - name: last_name + description: Customer's last name. PII. + + - name: first_order + description: Date (UTC) of a customer's first order + + - name: most_recent_order + description: Date (UTC) of a customer's most recent order + + - name: number_of_orders + description: Count of the number of orders a customer has placed + + - name: total_order_amount + description: Total value (AUD) of a customer's orders + + - name: orders + description: This table has basic information about orders, as well as some derived facts based on payments + + columns: + - name: order_id + tests: + - unique + - not_null + description: This is a unique identifier for an order + + - name: customer_id + description: Foreign key to the customers table + tests: + - not_null + - relationships: + to: ref('customers') + field: customer_id + + - name: order_date + description: Date (UTC) that the order was placed + + - name: status + description: '{{ doc("orders_status") }}' + tests: + - accepted_values: + values: ['placed', 'shipped', 'completed', 'return_pending', 'returned'] + + - name: amount + description: Total amount (AUD) of the order + tests: + - not_null + + - name: credit_card_amount + description: Amount of the order (AUD) paid for by credit card + tests: + - not_null + + - name: coupon_amount + description: Amount of the order (AUD) paid for by coupon + tests: + - not_null + + - name: bank_transfer_amount + description: Amount of the order (AUD) paid for by bank transfer + tests: + - not_null + + - name: gift_card_amount + description: Amount of the order (AUD) paid for by gift card + tests: + - not_null diff --git a/dev/dags/dbt/original_jaffle_shop/models/staging/schema.yml b/dev/dags/dbt/original_jaffle_shop/models/staging/schema.yml new file mode 100644 index 000000000..c207e4cf5 --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/models/staging/schema.yml @@ -0,0 +1,31 @@ +version: 2 + +models: + - name: stg_customers + columns: + - name: customer_id + tests: + - unique + - not_null + + - name: stg_orders + columns: + - name: order_id + tests: + - unique + - not_null + - name: status + tests: + - accepted_values: + values: ['placed', 'shipped', 'completed', 'return_pending', 'returned'] + + - name: stg_payments + columns: + - name: payment_id + tests: + - unique + - not_null + - name: payment_method + tests: + - accepted_values: + values: ['credit_card', 'coupon', 'bank_transfer', 'gift_card'] diff --git a/dev/dags/dbt/original_jaffle_shop/models/staging/stg_customers.sql b/dev/dags/dbt/original_jaffle_shop/models/staging/stg_customers.sql new file mode 100644 index 000000000..cad047269 --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/models/staging/stg_customers.sql @@ -0,0 +1,22 @@ +with source as ( + + {#- + Normally we would select from the table here, but we are using seeds to load + our data in this project + #} + select * from {{ ref('raw_customers') }} + +), + +renamed as ( + + select + id as customer_id, + first_name, + last_name + + from source + +) + +select * from renamed diff --git a/dev/dags/dbt/original_jaffle_shop/models/staging/stg_orders.sql b/dev/dags/dbt/original_jaffle_shop/models/staging/stg_orders.sql new file mode 100644 index 000000000..a654dcb94 --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/models/staging/stg_orders.sql @@ -0,0 +1,23 @@ +with source as ( + + {#- + Normally we would select from the table here, but we are using seeds to load + our data in this project + #} + select * from {{ ref('raw_orders') }} + +), + +renamed as ( + + select + id as order_id, + user_id as customer_id, + order_date, + status + + from source + +) + +select * from renamed diff --git a/dev/dags/dbt/original_jaffle_shop/models/staging/stg_payments.sql b/dev/dags/dbt/original_jaffle_shop/models/staging/stg_payments.sql new file mode 100644 index 000000000..f718596ad --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/models/staging/stg_payments.sql @@ -0,0 +1,25 @@ +with source as ( + + {#- + Normally we would select from the table here, but we are using seeds to load + our data in this project + #} + select * from {{ ref('raw_payments') }} + +), + +renamed as ( + + select + id as payment_id, + order_id, + payment_method, + + -- `amount` is currently stored in cents, so we convert it to dollars + amount / 100 as amount + + from source + +) + +select * from renamed diff --git a/dev/dags/dbt/original_jaffle_shop/seeds/raw_customers.csv b/dev/dags/dbt/original_jaffle_shop/seeds/raw_customers.csv new file mode 100644 index 000000000..b3e6747d6 --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/seeds/raw_customers.csv @@ -0,0 +1,101 @@ +id,first_name,last_name +1,Michael,P. +2,Shawn,M. +3,Kathleen,P. +4,Jimmy,C. +5,Katherine,R. +6,Sarah,R. +7,Martin,M. +8,Frank,R. +9,Jennifer,F. +10,Henry,W. +11,Fred,S. +12,Amy,D. +13,Kathleen,M. +14,Steve,F. +15,Teresa,H. +16,Amanda,H. +17,Kimberly,R. +18,Johnny,K. +19,Virginia,F. +20,Anna,A. +21,Willie,H. +22,Sean,H. +23,Mildred,A. +24,David,G. +25,Victor,H. +26,Aaron,R. +27,Benjamin,B. +28,Lisa,W. +29,Benjamin,K. +30,Christina,W. +31,Jane,G. +32,Thomas,O. +33,Katherine,M. +34,Jennifer,S. +35,Sara,T. +36,Harold,O. +37,Shirley,J. +38,Dennis,J. +39,Louise,W. +40,Maria,A. +41,Gloria,C. +42,Diana,S. +43,Kelly,N. +44,Jane,R. +45,Scott,B. +46,Norma,C. +47,Marie,P. +48,Lillian,C. +49,Judy,N. +50,Billy,L. +51,Howard,R. +52,Laura,F. +53,Anne,B. +54,Rose,M. +55,Nicholas,R. +56,Joshua,K. +57,Paul,W. +58,Kathryn,K. +59,Adam,A. +60,Norma,W. +61,Timothy,R. +62,Elizabeth,P. +63,Edward,G. +64,David,C. +65,Brenda,W. +66,Adam,W. +67,Michael,H. +68,Jesse,E. +69,Janet,P. +70,Helen,F. +71,Gerald,C. +72,Kathryn,O. +73,Alan,B. +74,Harry,A. +75,Andrea,H. +76,Barbara,W. +77,Anne,W. +78,Harry,H. +79,Jack,R. +80,Phillip,H. +81,Shirley,H. +82,Arthur,D. +83,Virginia,R. +84,Christina,R. +85,Theresa,M. +86,Jason,C. +87,Phillip,B. +88,Adam,T. +89,Margaret,J. +90,Paul,P. +91,Todd,W. +92,Willie,O. +93,Frances,R. +94,Gregory,H. +95,Lisa,P. +96,Jacqueline,A. +97,Shirley,D. +98,Nicole,M. +99,Mary,G. +100,Jean,M. diff --git a/dev/dags/dbt/original_jaffle_shop/seeds/raw_orders.csv b/dev/dags/dbt/original_jaffle_shop/seeds/raw_orders.csv new file mode 100644 index 000000000..c4870621b --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/seeds/raw_orders.csv @@ -0,0 +1,100 @@ +id,user_id,order_date,status +1,1,2018-01-01,returned +2,3,2018-01-02,completed +3,94,2018-01-04,completed +4,50,2018-01-05,completed +5,64,2018-01-05,completed +6,54,2018-01-07,completed +7,88,2018-01-09,completed +8,2,2018-01-11,returned +9,53,2018-01-12,completed +10,7,2018-01-14,completed +11,99,2018-01-14,completed +12,59,2018-01-15,completed +13,84,2018-01-17,completed +14,40,2018-01-17,returned +15,25,2018-01-17,completed +16,39,2018-01-18,completed +17,71,2018-01-18,completed +18,64,2018-01-20,returned +19,54,2018-01-22,completed +20,20,2018-01-23,completed +21,71,2018-01-23,completed +22,86,2018-01-24,completed +23,22,2018-01-26,return_pending +24,3,2018-01-27,completed +25,51,2018-01-28,completed +26,32,2018-01-28,completed +27,94,2018-01-29,completed +28,8,2018-01-29,completed +29,57,2018-01-31,completed +30,69,2018-02-02,completed +31,16,2018-02-02,completed +32,28,2018-02-04,completed +33,42,2018-02-04,completed +34,38,2018-02-06,completed +35,80,2018-02-08,completed +36,85,2018-02-10,completed +37,1,2018-02-10,completed +38,51,2018-02-10,completed +39,26,2018-02-11,completed +40,33,2018-02-13,completed +41,99,2018-02-14,completed +42,92,2018-02-16,completed +43,31,2018-02-17,completed +44,66,2018-02-17,completed +45,22,2018-02-17,completed +46,6,2018-02-19,completed +47,50,2018-02-20,completed +48,27,2018-02-21,completed +49,35,2018-02-21,completed +50,51,2018-02-23,completed +51,71,2018-02-24,completed +52,54,2018-02-25,return_pending +53,34,2018-02-26,completed +54,54,2018-02-26,completed +55,18,2018-02-27,completed +56,79,2018-02-28,completed +57,93,2018-03-01,completed +58,22,2018-03-01,completed +59,30,2018-03-02,completed +60,12,2018-03-03,completed +61,63,2018-03-03,completed +62,57,2018-03-05,completed +63,70,2018-03-06,completed +64,13,2018-03-07,completed +65,26,2018-03-08,completed +66,36,2018-03-10,completed +67,79,2018-03-11,completed +68,53,2018-03-11,completed +69,3,2018-03-11,completed +70,8,2018-03-12,completed +71,42,2018-03-12,shipped +72,30,2018-03-14,shipped +73,19,2018-03-16,completed +74,9,2018-03-17,shipped +75,69,2018-03-18,completed +76,25,2018-03-20,completed +77,35,2018-03-21,shipped +78,90,2018-03-23,shipped +79,52,2018-03-23,shipped +80,11,2018-03-23,shipped +81,76,2018-03-23,shipped +82,46,2018-03-24,shipped +83,54,2018-03-24,shipped +84,70,2018-03-26,placed +85,47,2018-03-26,shipped +86,68,2018-03-26,placed +87,46,2018-03-27,placed +88,91,2018-03-27,shipped +89,21,2018-03-28,placed +90,66,2018-03-30,shipped +91,47,2018-03-31,placed +92,84,2018-04-02,placed +93,66,2018-04-03,placed +94,63,2018-04-03,placed +95,27,2018-04-04,placed +96,90,2018-04-06,placed +97,89,2018-04-07,placed +98,41,2018-04-07,placed +99,85,2018-04-09,placed diff --git a/dev/dags/dbt/original_jaffle_shop/seeds/raw_payments.csv b/dev/dags/dbt/original_jaffle_shop/seeds/raw_payments.csv new file mode 100644 index 000000000..a587baab5 --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/seeds/raw_payments.csv @@ -0,0 +1,114 @@ +id,order_id,payment_method,amount +1,1,credit_card,1000 +2,2,credit_card,2000 +3,3,coupon,100 +4,4,coupon,2500 +5,5,bank_transfer,1700 +6,6,credit_card,600 +7,7,credit_card,1600 +8,8,credit_card,2300 +9,9,gift_card,2300 +10,9,bank_transfer,0 +11,10,bank_transfer,2600 +12,11,credit_card,2700 +13,12,credit_card,100 +14,13,credit_card,500 +15,13,bank_transfer,1400 +16,14,bank_transfer,300 +17,15,coupon,2200 +18,16,credit_card,1000 +19,17,bank_transfer,200 +20,18,credit_card,500 +21,18,credit_card,800 +22,19,gift_card,600 +23,20,bank_transfer,1500 +24,21,credit_card,1200 +25,22,bank_transfer,800 +26,23,gift_card,2300 +27,24,coupon,2600 +28,25,bank_transfer,2000 +29,25,credit_card,2200 +30,25,coupon,1600 +31,26,credit_card,3000 +32,27,credit_card,2300 +33,28,bank_transfer,1900 +34,29,bank_transfer,1200 +35,30,credit_card,1300 +36,31,credit_card,1200 +37,32,credit_card,300 +38,33,credit_card,2200 +39,34,bank_transfer,1500 +40,35,credit_card,2900 +41,36,bank_transfer,900 +42,37,credit_card,2300 +43,38,credit_card,1500 +44,39,bank_transfer,800 +45,40,credit_card,1400 +46,41,credit_card,1700 +47,42,coupon,1700 +48,43,gift_card,1800 +49,44,gift_card,1100 +50,45,bank_transfer,500 +51,46,bank_transfer,800 +52,47,credit_card,2200 +53,48,bank_transfer,300 +54,49,credit_card,600 +55,49,credit_card,900 +56,50,credit_card,2600 +57,51,credit_card,2900 +58,51,credit_card,100 +59,52,bank_transfer,1500 +60,53,credit_card,300 +61,54,credit_card,1800 +62,54,bank_transfer,1100 +63,55,credit_card,2900 +64,56,credit_card,400 +65,57,bank_transfer,200 +66,58,coupon,1800 +67,58,gift_card,600 +68,59,gift_card,2800 +69,60,credit_card,400 +70,61,bank_transfer,1600 +71,62,gift_card,1400 +72,63,credit_card,2900 +73,64,bank_transfer,2600 +74,65,credit_card,0 +75,66,credit_card,2800 +76,67,bank_transfer,400 +77,67,credit_card,1900 +78,68,credit_card,1600 +79,69,credit_card,1900 +80,70,credit_card,2600 +81,71,credit_card,500 +82,72,credit_card,2900 +83,73,bank_transfer,300 +84,74,credit_card,3000 +85,75,credit_card,1900 +86,76,coupon,200 +87,77,credit_card,0 +88,77,bank_transfer,1900 +89,78,bank_transfer,2600 +90,79,credit_card,1800 +91,79,credit_card,900 +92,80,gift_card,300 +93,81,coupon,200 +94,82,credit_card,800 +95,83,credit_card,100 +96,84,bank_transfer,2500 +97,85,bank_transfer,1700 +98,86,coupon,2300 +99,87,gift_card,3000 +100,87,credit_card,2600 +101,88,credit_card,2900 +102,89,bank_transfer,2200 +103,90,bank_transfer,200 +104,91,credit_card,1900 +105,92,bank_transfer,1500 +106,92,coupon,200 +107,93,gift_card,2600 +108,94,coupon,700 +109,95,coupon,2400 +110,96,gift_card,1700 +111,97,bank_transfer,1400 +112,98,bank_transfer,1000 +113,99,credit_card,2400 diff --git a/dev/dags/simple_dag_async.py b/dev/dags/simple_dag_async.py index 787461236..1b2b67651 100644 --- a/dev/dags/simple_dag_async.py +++ b/dev/dags/simple_dag_async.py @@ -2,8 +2,8 @@ from datetime import datetime from pathlib import Path -from cosmos import DbtDag, ExecutionConfig, ExecutionMode, ProfileConfig, ProjectConfig -from cosmos.profiles import PostgresUserPasswordProfileMapping +from cosmos import DbtDag, ExecutionConfig, ExecutionMode, ProfileConfig, ProjectConfig, RenderConfig +from cosmos.profiles import GoogleCloudServiceAccountDictProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) @@ -11,29 +11,32 @@ profile_config = ProfileConfig( profile_name="default", target_name="dev", - profile_mapping=PostgresUserPasswordProfileMapping( - conn_id="example_conn", - profile_args={"schema": "public"}, - disable_event_tracking=True, + profile_mapping=GoogleCloudServiceAccountDictProfileMapping( + conn_id="gcp_gs_conn", profile_args={"dataset": "release_17", "project": "astronomer-dag-authoring"} ), ) + # [START airflow_async_execution_mode_example] simple_dag_async = DbtDag( # dbt/cosmos-specific parameters project_config=ProjectConfig( - DBT_ROOT_PATH / "jaffle_shop", + DBT_ROOT_PATH / "original_jaffle_shop", ), profile_config=profile_config, execution_config=ExecutionConfig( execution_mode=ExecutionMode.AIRFLOW_ASYNC, ), + render_config=RenderConfig( + select=["path:models"], + # test_behavior=TestBehavior.NONE + ), # normal dag parameters schedule_interval=None, start_date=datetime(2023, 1, 1), catchup=False, dag_id="simple_dag_async", tags=["simple"], - operator_args={"install_deps": True}, + operator_args={"full_refresh": True, "location": "northamerica-northeast1"}, ) # [END airflow_async_execution_mode_example] diff --git a/docs/getting_started/execution-modes.rst b/docs/getting_started/execution-modes.rst index ec150992d..10f6cce67 100644 --- a/docs/getting_started/execution-modes.rst +++ b/docs/getting_started/execution-modes.rst @@ -243,6 +243,7 @@ Each task will create a new Cloud Run Job execution, giving full isolation. The }, ) + Airflow Async (experimental) ---------------------------- @@ -268,9 +269,20 @@ deferrable operators and supplying to them those compiled SQLs. Note that currently, the ``airflow_async`` execution mode has the following limitations and is released as Experimental: -1. Only supports the ``dbt resource type`` models to be run asynchronously using Airflow deferrable operators. All other resources are executed synchronously using dbt commands as they are in the ``local`` execution mode. -2. Only supports BigQuery as the target database. If a profile target other than BigQuery is specified, Cosmos will error out saying that the target database is not supported with this execution mode. -3. Only works for ``full_refresh`` models. There is pending work to support other modes. +1. This feature only works when using Airflow 2.8 and above +2. Only supports the ``dbt resource type`` models to be run asynchronously using Airflow deferrable operators. All other resources are executed synchronously using dbt commands as they are in the ``local`` execution mode. +3. Only supports BigQuery as the target database. If a profile target other than BigQuery is specified, Cosmos will error out saying that the target database is not supported with this execution mode. +4. Only works for ``full_refresh`` models. There is pending work to support other modes. +5. Only Support for the Bigquery profile type +6. Users need to provide ProfileMapping parameter in ProfileConfig +7. It does not support dataset + +You can leverage async operator support by installing an additional dependency + +.. code:: bash + + astronomer-cosmos[dbt-bigquery, google] + Example DAG: @@ -279,6 +291,35 @@ Example DAG: :start-after: [START airflow_async_execution_mode_example] :end-before: [END airflow_async_execution_mode_example] +**Known Issue:** + +The ``dag test`` command failed with the following error, likely because the trigger does not fully initialize during the ``dag test``, leading to an uninitialized task instance. +This causes the BigQuery trigger to attempt accessing parameters of the Task Instance that are not properly initialized. + +.. code:: bash + + [2024-10-01T18:19:09.726+0530] {base_events.py:1738} ERROR - unhandled exception during asyncio.run() shutdown + task: ()> exception=AttributeError("'NoneType' object has no attribute 'dag_id'")> + Traceback (most recent call last): + File "/Users/pankaj/Documents/astro_code/astronomer-cosmos/devenv/lib/python3.9/site-packages/airflow/providers/google/cloud/triggers/bigquery.py", line 138, in run + yield TriggerEvent( + asyncio.exceptions.CancelledError + + During handling of the above exception, another exception occurred: + + Traceback (most recent call last): + File "/Users/pankaj/Documents/astro_code/astronomer-cosmos/devenv/lib/python3.9/site-packages/airflow/providers/google/cloud/triggers/bigquery.py", line 157, in run + if self.job_id and self.cancel_on_kill and self.safe_to_cancel(): + File "/Users/pankaj/Documents/astro_code/astronomer-cosmos/devenv/lib/python3.9/site-packages/airflow/providers/google/cloud/triggers/bigquery.py", line 126, in safe_to_cancel + task_instance = self.get_task_instance() # type: ignore[call-arg] + File "/Users/pankaj/Documents/astro_code/astronomer-cosmos/devenv/lib/python3.9/site-packages/airflow/utils/session.py", line 97, in wrapper + return func(*args, session=session, **kwargs) + File "/Users/pankaj/Documents/astro_code/astronomer-cosmos/devenv/lib/python3.9/site-packages/airflow/providers/google/cloud/triggers/bigquery.py", line 102, in get_task_instance + TaskInstance.dag_id == self.task_instance.dag_id, + AttributeError: 'NoneType' object has no attribute 'dag_id' + + + .. _invocation_modes: Invocation Modes ================ diff --git a/scripts/test/integration-dbt-1-5-4.sh b/scripts/test/integration-dbt-1-5-4.sh index bb936fc21..6992b8f15 100644 --- a/scripts/test/integration-dbt-1-5-4.sh +++ b/scripts/test/integration-dbt-1-5-4.sh @@ -1,5 +1,5 @@ pip uninstall dbt-adapters dbt-common dbt-core dbt-extractor dbt-postgres dbt-semantic-interfaces -y -pip install dbt-postgres==1.5.4 dbt-databricks==1.5.4 +pip install dbt-postgres==1.5.4 dbt-databricks==1.5.4 dbt-bigquery==1.5.4 export SOURCE_RENDERING_BEHAVIOR=all rm -rf airflow.*; \ airflow db init; \ diff --git a/scripts/test/integration-setup.sh b/scripts/test/integration-setup.sh index c6e106fd5..fec9e95eb 100644 --- a/scripts/test/integration-setup.sh +++ b/scripts/test/integration-setup.sh @@ -11,4 +11,4 @@ rm -rf airflow.* pip freeze | grep airflow airflow db reset -y airflow db init -pip install 'dbt-core' 'dbt-databricks' 'dbt-postgres' 'dbt-vertica' 'openlineage-airflow' +pip install 'dbt-core' 'dbt-bigquery' 'dbt-databricks' 'dbt-postgres' 'dbt-vertica' 'openlineage-airflow' diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 6fc7cdc0a..1bd8cab35 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -1,7 +1,7 @@ import os from datetime import datetime from pathlib import Path -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest from airflow import __version__ as airflow_version @@ -30,7 +30,7 @@ ) from cosmos.converter import airflow_kwargs from cosmos.dbt.graph import DbtNode -from cosmos.profiles import PostgresUserPasswordProfileMapping +from cosmos.profiles import GoogleCloudServiceAccountFileProfileMapping, PostgresUserPasswordProfileMapping SAMPLE_PROJ_PATH = Path("/home/user/path/dbt-proj/") SOURCE_RENDERING_BEHAVIOR = SourceRenderingBehavior(os.getenv("SOURCE_RENDERING_BEHAVIOR", "none")) @@ -228,19 +228,21 @@ def test_build_airflow_graph_with_after_all(): @pytest.mark.integration +@patch("airflow.hooks.base.BaseHook.get_connection", new=MagicMock()) def test_build_airflow_graph_with_dbt_compile_task(): + bigquery_profile_config = ProfileConfig( + profile_name="my-bigquery-db", + target_name="dev", + profile_mapping=GoogleCloudServiceAccountFileProfileMapping( + conn_id="fake_conn", profile_args={"dataset": "release_17"} + ), + ) with DAG("test-id-dbt-compile", start_date=datetime(2022, 1, 1)) as dag: task_args = { "project_dir": SAMPLE_PROJ_PATH, "conn_id": "fake_conn", - "profile_config": ProfileConfig( - profile_name="default", - target_name="default", - profile_mapping=PostgresUserPasswordProfileMapping( - conn_id="fake_conn", - profile_args={"schema": "public"}, - ), - ), + "profile_config": bigquery_profile_config, + "location": "", } render_config = RenderConfig( select=["tag:some"], @@ -318,7 +320,7 @@ def test_create_task_metadata_unsupported(caplog): tags=[], config={}, ) - response = create_task_metadata(child_node, execution_mode="", args={}) + response = create_task_metadata(child_node, execution_mode="", args={}, dbt_dag_task_group_identifier="") assert response is None expected_msg = ( "Unavailable conversion function for (node ). " @@ -337,6 +339,7 @@ def test_create_task_metadata_unsupported(caplog): "cosmos.operators.local.DbtRunLocalOperator", {"models": "my_model"}, { + "dbt_dag_task_group_identifier": "", "dbt_node_config": { "unique_id": "model.my_folder.my_model", "resource_type": "model", @@ -347,7 +350,7 @@ def test_create_task_metadata_unsupported(caplog): "has_test": False, "resource_name": "my_model", "name": "my_model", - } + }, }, ), ( @@ -377,6 +380,7 @@ def test_create_task_metadata_unsupported(caplog): "cosmos.operators.local.DbtSnapshotLocalOperator", {"models": "my_snapshot"}, { + "dbt_dag_task_group_identifier": "", "dbt_node_config": { "unique_id": "snapshot.my_folder.my_snapshot", "resource_type": "snapshot", @@ -411,7 +415,9 @@ def test_create_task_metadata_model( has_freshness=True, ) - metadata = create_task_metadata(child_node, execution_mode=ExecutionMode.LOCAL, args={}) + metadata = create_task_metadata( + child_node, execution_mode=ExecutionMode.LOCAL, args={}, dbt_dag_task_group_identifier="" + ) if metadata: assert metadata.id == expected_id assert metadata.operator_class == expected_operator_class @@ -428,7 +434,9 @@ def test_create_task_metadata_model_with_versions(caplog): tags=[], config={}, ) - metadata = create_task_metadata(child_node, execution_mode=ExecutionMode.LOCAL, args={}) + metadata = create_task_metadata( + child_node, execution_mode=ExecutionMode.LOCAL, args={}, dbt_dag_task_group_identifier="" + ) assert metadata.id == "my_model_v1_run" assert metadata.operator_class == "cosmos.operators.local.DbtRunLocalOperator" assert metadata.arguments == {"models": "my_model.v1"} @@ -443,7 +451,9 @@ def test_create_task_metadata_model_use_task_group(caplog): tags=[], config={}, ) - metadata = create_task_metadata(child_node, execution_mode=ExecutionMode.LOCAL, args={}, use_task_group=True) + metadata = create_task_metadata( + child_node, execution_mode=ExecutionMode.LOCAL, args={}, use_task_group=True, dbt_dag_task_group_identifier="" + ) assert metadata.id == "run" @@ -498,7 +508,11 @@ def test_create_task_metadata_source_with_rendering_options( ) metadata = create_task_metadata( - child_node, execution_mode=ExecutionMode.LOCAL, source_rendering_behavior=source_rendering_behavior, args={} + child_node, + execution_mode=ExecutionMode.LOCAL, + source_rendering_behavior=source_rendering_behavior, + args={}, + dbt_dag_task_group_identifier="", ) if metadata: assert metadata.id == expected_id @@ -516,12 +530,15 @@ def test_create_task_metadata_seed(caplog, use_task_group): config={}, ) if use_task_group is None: - metadata = create_task_metadata(sample_node, execution_mode=ExecutionMode.DOCKER, args={}) + metadata = create_task_metadata( + sample_node, execution_mode=ExecutionMode.DOCKER, args={}, dbt_dag_task_group_identifier="" + ) else: metadata = create_task_metadata( sample_node, execution_mode=ExecutionMode.DOCKER, args={}, + dbt_dag_task_group_identifier="", use_task_group=use_task_group, ) @@ -543,7 +560,9 @@ def test_create_task_metadata_snapshot(caplog): tags=[], config={}, ) - metadata = create_task_metadata(sample_node, execution_mode=ExecutionMode.KUBERNETES, args={}) + metadata = create_task_metadata( + sample_node, execution_mode=ExecutionMode.KUBERNETES, args={}, dbt_dag_task_group_identifier="" + ) assert metadata.id == "my_snapshot_snapshot" assert metadata.operator_class == "cosmos.operators.kubernetes.DbtSnapshotKubernetesOperator" assert metadata.arguments == {"models": "my_snapshot"} diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 4174c9a2d..1c0912042 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1522,7 +1522,7 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir hash_dir, hash_args = version.split(",") assert hash_args == "d41d8cd98f00b204e9800998ecf8427e" if sys.platform == "darwin": - assert hash_dir == "c1e25b0679b5ddcb636bcc30f2f85a06" + assert hash_dir == "25beeb54cc4eeabe6198248e286a1cfe" else: assert hash_dir == "6f63493009733a7be34364a6ea3ffd3c" diff --git a/tests/operators/test_airflow_async.py b/tests/operators/test_airflow_async.py index fc085c7d0..ec2f5e715 100644 --- a/tests/operators/test_airflow_async.py +++ b/tests/operators/test_airflow_async.py @@ -1,10 +1,11 @@ +import pytest +from airflow import __version__ as airflow_version +from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator +from packaging import version + from cosmos.operators.airflow_async import ( DbtBuildAirflowAsyncOperator, DbtCompileAirflowAsyncOperator, - DbtDocsAirflowAsyncOperator, - DbtDocsAzureStorageAirflowAsyncOperator, - DbtDocsGCSAirflowAsyncOperator, - DbtDocsS3AirflowAsyncOperator, DbtLSAirflowAsyncOperator, DbtRunAirflowAsyncOperator, DbtRunOperationAirflowAsyncOperator, @@ -16,12 +17,7 @@ from cosmos.operators.local import ( DbtBuildLocalOperator, DbtCompileLocalOperator, - DbtDocsAzureStorageLocalOperator, - DbtDocsGCSLocalOperator, - DbtDocsLocalOperator, - DbtDocsS3LocalOperator, DbtLSLocalOperator, - DbtRunLocalOperator, DbtRunOperationLocalOperator, DbtSeedLocalOperator, DbtSnapshotLocalOperator, @@ -50,8 +46,12 @@ def test_dbt_source_airflow_async_operator_inheritance(): assert issubclass(DbtSourceAirflowAsyncOperator, DbtSourceLocalOperator) +@pytest.mark.skipif( + version.parse(airflow_version) < version.parse("2.8"), + reason="Cosmos Async operators only work with Airflow 2.8 onwards.", +) def test_dbt_run_airflow_async_operator_inheritance(): - assert issubclass(DbtRunAirflowAsyncOperator, DbtRunLocalOperator) + assert issubclass(DbtRunAirflowAsyncOperator, BigQueryInsertJobOperator) def test_dbt_test_airflow_async_operator_inheritance(): @@ -62,21 +62,5 @@ def test_dbt_run_operation_airflow_async_operator_inheritance(): assert issubclass(DbtRunOperationAirflowAsyncOperator, DbtRunOperationLocalOperator) -def test_dbt_docs_airflow_async_operator_inheritance(): - assert issubclass(DbtDocsAirflowAsyncOperator, DbtDocsLocalOperator) - - -def test_dbt_docs_s3_airflow_async_operator_inheritance(): - assert issubclass(DbtDocsS3AirflowAsyncOperator, DbtDocsS3LocalOperator) - - -def test_dbt_docs_azure_storage_airflow_async_operator_inheritance(): - assert issubclass(DbtDocsAzureStorageAirflowAsyncOperator, DbtDocsAzureStorageLocalOperator) - - -def test_dbt_docs_gcs_airflow_async_operator_inheritance(): - assert issubclass(DbtDocsGCSAirflowAsyncOperator, DbtDocsGCSLocalOperator) - - def test_dbt_compile_airflow_async_operator_inheritance(): assert issubclass(DbtCompileAirflowAsyncOperator, DbtCompileLocalOperator) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index c7615225f..ed954dfdf 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1147,7 +1147,7 @@ def test_dbt_compile_local_operator_initialisation(): @patch("cosmos.operators.local.remote_target_path", new="s3://some-bucket/target") -@patch("cosmos.operators.local.AIRFLOW_IO_AVAILABLE", new=False) +@patch("cosmos.settings.AIRFLOW_IO_AVAILABLE", new=False) def test_configure_remote_target_path_object_storage_unavailable_on_earlier_airflow_versions(): operator = DbtCompileLocalOperator( task_id="fake-task", @@ -1242,6 +1242,7 @@ def test_upload_compiled_sql_should_upload(mock_configure_remote, mock_object_st profile_config=profile_config, project_dir="fake-dir", dag=DAG("test_dag", start_date=datetime(2024, 4, 16)), + extra_context={"dbt_dag_task_group_identifier": "test_dag"}, ) mock_configure_remote.return_value = ("mock_remote_path", "mock_conn_id") diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 9aa66432d..6c7e98802 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -68,6 +68,10 @@ def get_dag_bag() -> DagBag: print(f"Adding {dagfile} to .airflowignore") file.writelines([f"{dagfile}\n"]) + # Ignore Async DAG for dbt <=1.5 + if DBT_VERSION <= Version("1.5.0"): + file.writelines(["simple_dag_async.py\n"]) + # The dbt sqlite adapter is only available until dbt 1.4 if DBT_VERSION >= Version("1.5.0"): file.writelines(["example_cosmos_sources.py\n"]) @@ -98,4 +102,10 @@ def test_example_dag(session, dag_id: str): return dag_bag = get_dag_bag() dag = dag_bag.get_dag(dag_id) - test_utils.run_dag(dag) + + # This feature is available since Airflow 2.5 and we've backported it in Cosmos: + # https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#airflow-2-5-0-2022-12-02 + if AIRFLOW_VERSION >= Version("2.5"): + dag.test() + else: + test_utils.run_dag(dag) diff --git a/tests/test_example_dags_no_connections.py b/tests/test_example_dags_no_connections.py index 0cc560ecc..3a43a644c 100644 --- a/tests/test_example_dags_no_connections.py +++ b/tests/test_example_dags_no_connections.py @@ -43,6 +43,10 @@ def get_dag_bag() -> DagBag: print(f"Adding {dagfile} to .airflowignore") file.writelines([f"{dagfile}\n"]) + # Ignore Async DAG for dbt <=1.5 + if DBT_VERSION <= Version("1.5.0"): + file.writelines(["simple_dag_async.py\n"]) + if DBT_VERSION >= Version("1.5.0"): file.writelines(["example_cosmos_sources.py\n"]) if DBT_VERSION < Version("1.6.0"):