Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Decouple hook executor from the operator, add Cloud Build execution and adjust testing #48

Open
wants to merge 79 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
0fb5be6
Decouple hook executor from the operator, add Cloud Build execution a…
dinigo Oct 18, 2021
e53b744
Add hook to run in google cloud build
dinigo Oct 18, 2021
b922f21
Fix base operator
dinigo Oct 18, 2021
f2fb018
Test GCP Cloud Build hook
dinigo Oct 18, 2021
ad837ee
Fix cloudbuild logic
dinigo Oct 18, 2021
543d744
Clean unused imports and fix path to upload dbt sources in GCS
dinigo Oct 18, 2021
5ac7f2a
Fix the upload sources test
dinigo Oct 18, 2021
53243c0
Add docstrings to all DbtBaseOperator subclases
dinigo Oct 18, 2021
644f594
Add `env` param to DbtBaseOperator to pass env variables to runtime
dinigo Oct 18, 2021
5cc650d
Export the DbtCloudBuildHook from the hooks package
dinigo Oct 22, 2021
db5e121
Check that google provider API version is the right for the implement…
dinigo Oct 22, 2021
f55624a
Add project_dir flag to the command line
dinigo Oct 22, 2021
ef26f88
Remove cloud build timeout and wait. Thats google provider api v6, not 5
dinigo Oct 22, 2021
129aa93
If dbt_bin is provided None or empty just don't add it to the command
dinigo Oct 22, 2021
289d563
Add `project_dir` option flag
dinigo Oct 22, 2021
081222d
Add the `use_color` flag (#43). If set adds the dbt flag
dinigo Oct 22, 2021
bff2d5c
Template all the fields from the DbtBaseOperator (#46 and #30)
dinigo Oct 22, 2021
6a459e8
Refactor each hook into it's own file
dinigo Oct 25, 2021
dcaa3bb
Remove upload functionality, provided by `LocalFilesystemToGCSOperator`
dinigo Oct 25, 2021
f234900
Adjust imports after refactor
dinigo Oct 25, 2021
c582899
Test Cloud Build DBT hook and remove test cloudbuild upload
dinigo Oct 25, 2021
efdb64d
Fix test params for generate_dbt_cli_command
dinigo Oct 25, 2021
678989c
Export DbtBaseOperator and DbtCloudBuildOperator from the .operators pkg
dinigo Oct 25, 2021
3ff8c4f
Add missing dbt flags, add optional config and Config type to enforce it
dinigo Oct 25, 2021
457f916
Remove dbt_bin from constructors and defer hook instantiation
dinigo Oct 26, 2021
5c3159c
Fix test for TestDbtCliHook checking return code different from 0
dinigo Oct 26, 2021
bec7fe5
Bring back "dir" to fix back the API, add warnings
dinigo Oct 26, 2021
0c7587a
DbtCleanOperator was missing from operator exports (__init__)
dinigo Oct 26, 2021
ce732b2
Fix typo (thanks @fenimore) and reorganize imports
dinigo Oct 26, 2021
8ea9958
Comply with linter (flake8)
dinigo Oct 26, 2021
f1c84fe
Refactor, move google operator out of the dbt_operator
dinigo Oct 27, 2021
a3e1213
Display Cloud Build logs in the Airflow logs
dinigo Oct 27, 2021
7776c45
Get rid of __init__ files exporting classes from different places
dinigo Oct 27, 2021
2f8b9e7
Fix TestDbtCloudBuildHook with the new config sent to Cloud Build
dinigo Oct 27, 2021
3c90bc5
Remove unused DbtCloudBuildHook param `dbt_bin`
dinigo Oct 27, 2021
abd4bfe
Test extensively command generation, execution in cli, hook injection...
dinigo Nov 2, 2021
13b2559
Replace explicit generate_dbt_cli_command with generic one
dinigo Nov 2, 2021
0435b58
Docs adjustments mainly
dinigo Nov 2, 2021
e3736f5
Flags are passed True to be reflected in the command line.
dinigo Nov 2, 2021
82ec4f4
Make abstract method implement a single LOC `pass`. thanks @fenimore
dinigo Dec 7, 2021
b505d59
Explain conditional import of TypedDict as suggested by @andrewrjones
dinigo Dec 7, 2021
239e05f
Reuse type in different conditionals, suggested by @johanna-ojeling-d…
dinigo Dec 7, 2021
0cc0205
Make project_id optional in the DbtCloudBuildHook
dinigo Dec 7, 2021
a1dbbc7
Fix typo in pydoc
dinigo Dec 7, 2021
cb0566d
Make `env` dict for environment variables for google dbt hook optional
dinigo Dec 7, 2021
d4e5184
Make `service_account ` for google dbt hook optional
dinigo Dec 7, 2021
98c90f0
Shorten up conditional assignment of project_id
dinigo Dec 7, 2021
33af351
Make `env` dict for environment variables for base dbt hook optional
dinigo Dec 7, 2021
139bfe3
Shorten up conditional assignment of env dict
dinigo Dec 7, 2021
c6b3699
Make `env ` for base dbt hook optional
dinigo Dec 7, 2021
face550
Add descriptions to each parametrized dbt test
dinigo Dec 7, 2021
6f7a507
Add README.md code examples
dinigo Dec 7, 2021
1c41b36
Split check values in two for readability
dinigo Dec 7, 2021
52884ae
Extract provider version check to a function and test it, also typos
dinigo Dec 7, 2021
df62625
Fix conditionals comparing with None since None is `falsy` in python
dinigo Dec 7, 2021
642f6dd
Fix python files not being detected in setup by `find_packages`:
dinigo Jan 13, 2022
614309b
Bump version
dinigo Jan 13, 2022
23e16a6
Make operator font color white, easier to read against a mid-dark bg
dinigo Jan 13, 2022
ce219a0
Allow false flags and reorder dbt render
dinigo Jan 14, 2022
22ddab1
Share DbtBaseOperator props between subclasses
dinigo Jan 14, 2022
de1fec4
Ignore deprecated @apply_defaults, It's needed for backwards compat
dinigo Jan 14, 2022
0f6e313
Fix dbt config, some params for dbt and some for the specific command
dinigo Jan 14, 2022
f2075f5
Allow a custom image other than the official from fishtown
dinigo Jan 14, 2022
b279112
Add DbtCloudBuildOperator templated fields to the DbtBaseOperator ones
dinigo Jan 14, 2022
957f76b
Bump dbt docker image for cloud build
dinigo Jan 14, 2022
a2a3346
Cloud build hook can raise silently. Surround with a try/catch
dinigo Jan 14, 2022
4515199
Add default conn_id, dbt_version and dbt_image and simpler cloudbuild…
dinigo Jan 14, 2022
f42cdfe
Fix compoundd commands like 'dbt docs generate' not running in GCP
dinigo Feb 1, 2022
0c1bcf4
Export artifacts (`target` folder) when running in cloud build:
dinigo Feb 1, 2022
697a2ba
Enhance type safety
dinigo Feb 1, 2022
460d361
Use `Dict` instead of the default new `dict` for backwards compat
dinigo Mar 8, 2022
7cab0fd
use the cloud build library instead of the hook
dinigo Mar 8, 2022
b3dbdb6
Ignore terraform, dbt and pycharm related files
dinigo Mar 8, 2022
e1969f8
Bump version
dinigo Mar 8, 2022
9766bbc
chore: update default dbt-imge to github registry
dinigo Oct 24, 2022
f1744a8
feat: add debug operator
dinigo Oct 24, 2022
f34d22e
chore: update default dbt docker image versions, part 2
dinigo Oct 24, 2022
09548af
chore: simpler and standard __version__ so that compilation picks it
dinigo Oct 24, 2022
98692ed
ci: replace old build system with the new default pyproject
dinigo Oct 24, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,11 @@ dmypy.json

# pytype static type analyzer
.pytype/

.run/

.terraform/

.user.yaml

.idea/
68 changes: 66 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ There are five operators currently implemented:
* Calls [`dbt test`](https://docs.getdbt.com/docs/test)


Each of the above operators accept the following arguments:
Each of the above operators accept the arguments in [here (dbt_command_config)](airflow_dbt/dbt_command_config.py). The main ones being:

* `profiles_dir`
* If set, passed as the `--profiles-dir` argument to the `dbt` command
Expand Down Expand Up @@ -96,6 +96,68 @@ Typically you will want to use the `DbtRunOperator`, followed by the `DbtTestOpe

You can also use the hook directly. Typically this can be used for when you need to combine the `dbt` command with another task in the same operators, for example running `dbt docs` and uploading the docs to somewhere they can be served from.

## A more advanced example:

If want to run your `dbt` project other tan in the airflow worker you can use
the `DbtCloudBuildHook` and apply it to the `DbtBaseOperator` or simply use the
provided `DbtCloudBuildOperator`:

```python
from airflow_dbt.hooks import DbtCloudBuildHook
from airflow_dbt.operators import DbtBaseOperator, DbtCloudBuildOperator
DbtBaseOperator(
task_id='provide_hook',
command='run',
use_colors=False,
config={
'profiles_dir': './jaffle-shop',
'project_dir': './jaffle-shop',
},
dbt_hook=DbtCloudBuildHook(
gcs_staging_location='gs://my-bucket/compressed-dbt-project.tar.gz'
)
)

DbtCloudBuildOperator(
task_id='default_hook_cloudbuild',
gcs_staging_location='gs://my-bucket/compressed-dbt-project.tar.gz',
command='run',
use_colors=False,
config={
'profiles_dir': './jaffle-shop',
'project_dir': './jaffle-shop',
},
)
```

You can either define the dbt params/config/flags in the operator or you can
group them into a `config` param. They both have validation, but only the config
has templating. The following two tasks are equivalent:

```python
from airflow_dbt.operators.dbt_operator import DbtBaseOperator

DbtBaseOperator(
task_id='config_param',
command='run',
config={
'profiles_dir': './jaffle-shop',
'project_dir': './jaffle-shop',
'dbt_bin': '/usr/local/airflow/.local/bin/dbt',
'use_colors': False
}
)

DbtBaseOperator(
task_id='flat_config',
command='run',
profiles_dir='./jaffle-shop',
project_dir='./jaffle-shop',
dbt_bin='/usr/local/airflow/.local/bin/dbt',
use_colors=False
)
```

## Building Locally

To install from the repository:
Expand Down Expand Up @@ -147,7 +209,9 @@ If you use MWAA, you just need to update the `requirements.txt` file and add `ai
Then you can have your dbt code inside a folder `{DBT_FOLDER}` in the dags folder on S3 and configure the dbt task like below:

```python
dbt_run = DbtRunOperator(
from airflow_dbt.operators.dbt_operator import DbtRunOperator

dbt_run=DbtRunOperator(
task_id='dbt_run',
dbt_bin='/usr/local/airflow/.local/bin/dbt',
profiles_dir='/usr/local/airflow/dags/{DBT_FOLDER}/',
Expand Down
9 changes: 0 additions & 9 deletions airflow_dbt/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +0,0 @@
from .hooks import DbtCliHook
from .operators import (
DbtSeedOperator,
DbtSnapshotOperator,
DbtRunOperator,
DbtTestOperator,
DbtDocsGenerateOperator,
DbtDepsOperator
)
4 changes: 1 addition & 3 deletions airflow_dbt/__version__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
VERSION = (0, 4, 0)

__version__ = '.'.join(map(str, VERSION))
__version__ = "0.5.10"
69 changes: 69 additions & 0 deletions airflow_dbt/dbt_command_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import sys

# Python versions older than 3.8 have the TypedDict in a different namespace.
# In case we find ourselves in that situation, we use the `older` import
if sys.version_info[0] == 3 and sys.version_info[1] >= 8:
from typing import TypedDict
else:
from typing_extensions import TypedDict
dinigo marked this conversation as resolved.
Show resolved Hide resolved


class DbtCommandConfig(TypedDict, total=False):
"""
Holds the structure of a dictionary containing dbt config. Provides the
types and names for each one, and also helps shortening the constructor
since we can nest it and reuse it
"""
# global flags
version: bool
record_timing_info: bool
debug: bool
log_format: str # either 'text', 'json' or 'default'
write_json: bool
strict: bool
warn_error: bool
partial_parse: bool
use_experimental_parser: bool
use_colors: bool
no_use_colors: bool

# per command flags
profiles_dir: str
project_dir: str
target: str
vars: dict
models: str
exclude: str

# run specific
full_refresh: bool
profile: str

# docs specific
no_compile: bool

# debug specific
config_dir: str

# ls specific
resource_type: str # models, snapshots, seeds, tests, and sources.
select: str
models: str
exclude: str
selector: str
output: str
output_keys: str

# rpc specific
host: str
port: int

# run specific
fail_fast: bool

# run-operation specific
args: dict

# test specific
data: bool
schema: bool
1 change: 0 additions & 1 deletion airflow_dbt/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
from .dbt_hook import DbtCliHook
98 changes: 98 additions & 0 deletions airflow_dbt/hooks/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import json
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Union

# noinspection PyDeprecation
from airflow.hooks.base_hook import BaseHook

from airflow_dbt.dbt_command_config import DbtCommandConfig


def render_config(config: Dict[str, Union[str, bool]]) -> List[str]:
"""Renders a dictionary of options into a list of cli strings"""
dbt_command_config_annotations = DbtCommandConfig.__annotations__
command_params = []
for key, value in config.items():
if key not in dbt_command_config_annotations:
raise ValueError(f"{key} is not a valid key")
if value is not None:
param_value_type = type(value)
# check that the value has the correct type from dbt_command_config_annotations
if param_value_type != dbt_command_config_annotations[key]:
raise TypeError(f"{key} has to be of type {dbt_command_config_annotations[key]}")
# if the param is not bool it must have a non null value
flag_prefix = ''
if param_value_type is bool and not value:
flag_prefix = 'no-'
cli_param_from_kwarg = "--" + flag_prefix + key.replace("_", "-")
command_params.append(cli_param_from_kwarg)
if param_value_type is str:
command_params.append(value)
elif param_value_type is int:
command_params.append(str(value))
elif param_value_type is dict:
command_params.append(json.dumps(value))
return command_params


def generate_dbt_cli_command(
dbt_bin: str,
command: str,
base_config: Dict[str, Union[str, bool]],
command_config: Dict[str, Union[str, bool]],
) -> List[str]:
"""
Creates a CLI string from the keys in the dictionary. If the key is none
it is ignored. If the key is of type boolean the name of the key is added.
If the key is of type string it adds the the key prefixed with tow dashes.
If the key is of type integer it adds the the key prefixed with three
dashes.
dbt_bin and command are mandatory.
Boolean flags must always be positive.

Available params are:
:param command_config: Specific params for the commands
:type command_config: dict
:param base_config: Params that apply to the `dbt` program regardless of
the command it is running
:type base_config: dict
:param command: The dbt sub-command to run
:type command: str
:param dbt_bin: Path to the dbt binary, defaults to `dbt` assumes it is
available in the PATH.
:type dbt_bin: str
:param command: The dbt sub command to run, for example for `dbt run`
the base_command will be `run`. If any other flag not contemplated
must be included it can also be added to this string
:type command: str
"""
if not dbt_bin:
raise ValueError("dbt_bin is mandatory")
if not command:
raise ValueError("command mandatory")
base_params = render_config(base_config)
command_params = render_config(command_config)
# commands like 'dbt docs generate' need the command to be split in two
command_pieces = command.split(" ")
return [dbt_bin, *base_params, *command_pieces, *command_params]


class DbtBaseHook(BaseHook, ABC):
"""
Base abstract class for all DbtHooks to have a common interface and force
implement the mandatory `run_dbt()` function.
"""

def __init__(self, env: Optional[Dict] = None):
"""
:param env: If set will be passed over to cloud build to run in the
dbt step
:type env: dict
"""
super().__init__()
self.env = env or {}

@abstractmethod
def run_dbt(self, dbt_cmd: Union[str, List[str]]):
"""Run the dbt command"""
dinigo marked this conversation as resolved.
Show resolved Hide resolved
pass
51 changes: 51 additions & 0 deletions airflow_dbt/hooks/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from __future__ import print_function

from typing import Any, Dict, List, Optional, Union

from airflow import AirflowException
from airflow.hooks.subprocess import SubprocessHook

from airflow_dbt.hooks.base import DbtBaseHook


class DbtCliHook(DbtBaseHook):
"""
Run the dbt command in the same airflow worker the task is being run.
This requires the `dbt` python package to be installed in it first.
"""

def __init__(self, env: Optional[Dict] = None):
"""
:type env:
:param env: Environment variables that will be passed to the
subprocess. Must be a dictionary of key-values
"""
self.sp = SubprocessHook()
super().__init__(env=env)

def get_conn(self) -> Any:
"""
Return the subprocess connection, which isn't implemented, just for
conformity
"""
return self.sp.get_conn()

def run_dbt(self, dbt_cmd: Union[str, List[str]]):
"""
Run the dbt cli

:param dbt_cmd: The dbt whole command to run
:type dbt_cmd: List[str]
"""
result = self.sp.run_command(
command=dbt_cmd,
env=self.env,
)

if result.exit_code != 0:
raise AirflowException(f'Error executing the DBT command: '
f'{result.output}')

def on_kill(self):
"""Kill the open subprocess if the task gets killed by Airflow"""
self.sp.send_sigterm()
Loading