diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 5ceafe11c..b827b9ce5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -25,16 +25,15 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 + with: + ref: ${{ github.event.pull_request.head.sha || github.ref }} + - uses: actions/setup-python@v3 with: python-version: '3.9' architecture: 'x64' - - uses: actions/cache@v3 - with: - path: | - ~/.cache/pip - key: ${{ runner.os }}-${{ hashFiles('pyproject.toml') }} - - run: pip3 install hatch mypy + + - run: pip3 install hatch - run: hatch run tests.py3.9-2.7:type-check Run-Unit-Tests: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 635c394ec..7acebacc4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -50,13 +50,13 @@ repos: - --py37-plus - --keep-runtime-typing - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.0.287 + rev: v0.0.288 hooks: - id: ruff args: - --fix - repo: https://github.com/psf/black - rev: 23.7.0 + rev: 23.9.1 hooks: - id: black args: [ "--config", "./pyproject.toml" ] diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 9e750ddab..d03af5f1f 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -50,8 +50,40 @@ def calculate_leaves(tasks_ids: list[str], nodes: dict[str, DbtNode]) -> list[st return leaves +def create_test_task_metadata( + test_task_name: str, + execution_mode: ExecutionMode, + task_args: dict[str, Any], + on_warning_callback: Callable[..., Any] | None = None, + model_name: str | None = None, +) -> TaskMetadata: + """ + Create the metadata that will be used to instantiate the Airflow Task that will be used to run the Dbt test node. + + :param test_task_name: Name of the Airflow task to be created + :param execution_mode: The Cosmos execution mode we're aiming to run the dbt task at (e.g. local) + :param task_args: Arguments to be used to instantiate an Airflow Task + :param on_warning_callback: A callback function called on warnings with additional Context variables “test_names” + and “test_results” of type List. + :param model_name: If the test relates to a specific model, the name of the model it relates to + :returns: The metadata necessary to instantiate the source dbt node as an Airflow task. + """ + task_args = dict(task_args) + task_args["on_warning_callback"] = on_warning_callback + if model_name is not None: + task_args["models"] = model_name + return TaskMetadata( + id=test_task_name, + operator_class=calculate_operator_class( + execution_mode=execution_mode, + dbt_class="DbtTest", + ), + arguments=task_args, + ) + + def create_task_metadata( - node: DbtNode, execution_mode: ExecutionMode, args: dict[str, Any], use_name_as_task_id_prefix=True + node: DbtNode, execution_mode: ExecutionMode, args: dict[str, Any], use_name_as_task_id_prefix: bool = True ) -> TaskMetadata | None: """ Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node. @@ -94,38 +126,6 @@ def create_task_metadata( return None -def create_test_task_metadata( - test_task_name: str, - execution_mode: ExecutionMode, - task_args: dict[str, Any], - on_warning_callback: Callable[..., Any] | None = None, - model_name: str | None = None, -) -> TaskMetadata: - """ - Create the metadata that will be used to instantiate the Airflow Task that will be used to run the Dbt test node. - - :param test_task_name: Name of the Airflow task to be created - :param execution_mode: The Cosmos execution mode we're aiming to run the dbt task at (e.g. local) - :param task_args: Arguments to be used to instantiate an Airflow Task - :param on_warning_callback: A callback function called on warnings with additional Context variables “test_names” - and “test_results” of type List. - :param model_name: If the test relates to a specific model, the name of the model it relates to - :returns: The metadata necessary to instantiate the source dbt node as an Airflow task. - """ - task_args = dict(task_args) - task_args["on_warning_callback"] = on_warning_callback - if model_name is not None: - task_args["models"] = model_name - return TaskMetadata( - id=test_task_name, - operator_class=calculate_operator_class( - execution_mode=execution_mode, - dbt_class="DbtTest", - ), - arguments=task_args, - ) - - def build_airflow_graph( nodes: dict[str, DbtNode], dag: DAG, # Airflow-specific - parent DAG where to associate tasks and (optional) task groups