Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Decouple hook executor from the operator, add Cloud Build execution and adjust testing #48

Open
wants to merge 79 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
0fb5be6
Decouple hook executor from the operator, add Cloud Build execution a…
dinigo Oct 18, 2021
e53b744
Add hook to run in google cloud build
dinigo Oct 18, 2021
b922f21
Fix base operator
dinigo Oct 18, 2021
f2fb018
Test GCP Cloud Build hook
dinigo Oct 18, 2021
ad837ee
Fix cloudbuild logic
dinigo Oct 18, 2021
543d744
Clean unused imports and fix path to upload dbt sources in GCS
dinigo Oct 18, 2021
5ac7f2a
Fix the upload sources test
dinigo Oct 18, 2021
53243c0
Add docstrings to all DbtBaseOperator subclases
dinigo Oct 18, 2021
644f594
Add `env` param to DbtBaseOperator to pass env variables to runtime
dinigo Oct 18, 2021
5cc650d
Export the DbtCloudBuildHook from the hooks package
dinigo Oct 22, 2021
db5e121
Check that google provider API version is the right for the implement…
dinigo Oct 22, 2021
f55624a
Add project_dir flag to the command line
dinigo Oct 22, 2021
ef26f88
Remove cloud build timeout and wait. Thats google provider api v6, not 5
dinigo Oct 22, 2021
129aa93
If dbt_bin is provided None or empty just don't add it to the command
dinigo Oct 22, 2021
289d563
Add `project_dir` option flag
dinigo Oct 22, 2021
081222d
Add the `use_color` flag (#43). If set adds the dbt flag
dinigo Oct 22, 2021
bff2d5c
Template all the fields from the DbtBaseOperator (#46 and #30)
dinigo Oct 22, 2021
6a459e8
Refactor each hook into it's own file
dinigo Oct 25, 2021
dcaa3bb
Remove upload functionality, provided by `LocalFilesystemToGCSOperator`
dinigo Oct 25, 2021
f234900
Adjust imports after refactor
dinigo Oct 25, 2021
c582899
Test Cloud Build DBT hook and remove test cloudbuild upload
dinigo Oct 25, 2021
efdb64d
Fix test params for generate_dbt_cli_command
dinigo Oct 25, 2021
678989c
Export DbtBaseOperator and DbtCloudBuildOperator from the .operators pkg
dinigo Oct 25, 2021
3ff8c4f
Add missing dbt flags, add optional config and Config type to enforce it
dinigo Oct 25, 2021
457f916
Remove dbt_bin from constructors and defer hook instantiation
dinigo Oct 26, 2021
5c3159c
Fix test for TestDbtCliHook checking return code different from 0
dinigo Oct 26, 2021
bec7fe5
Bring back "dir" to fix back the API, add warnings
dinigo Oct 26, 2021
0c7587a
DbtCleanOperator was missing from operator exports (__init__)
dinigo Oct 26, 2021
ce732b2
Fix typo (thanks @fenimore) and reorganize imports
dinigo Oct 26, 2021
8ea9958
Comply with linter (flake8)
dinigo Oct 26, 2021
f1c84fe
Refactor, move google operator out of the dbt_operator
dinigo Oct 27, 2021
a3e1213
Display Cloud Build logs in the Airflow logs
dinigo Oct 27, 2021
7776c45
Get rid of __init__ files exporting classes from different places
dinigo Oct 27, 2021
2f8b9e7
Fix TestDbtCloudBuildHook with the new config sent to Cloud Build
dinigo Oct 27, 2021
3c90bc5
Remove unused DbtCloudBuildHook param `dbt_bin`
dinigo Oct 27, 2021
abd4bfe
Test extensively command generation, execution in cli, hook injection...
dinigo Nov 2, 2021
13b2559
Replace explicit generate_dbt_cli_command with generic one
dinigo Nov 2, 2021
0435b58
Docs adjustments mainly
dinigo Nov 2, 2021
e3736f5
Flags are passed True to be reflected in the command line.
dinigo Nov 2, 2021
82ec4f4
Make abstract method implement a single LOC `pass`. thanks @fenimore
dinigo Dec 7, 2021
b505d59
Explain conditional import of TypedDict as suggested by @andrewrjones
dinigo Dec 7, 2021
239e05f
Reuse type in different conditionals, suggested by @johanna-ojeling-d…
dinigo Dec 7, 2021
0cc0205
Make project_id optional in the DbtCloudBuildHook
dinigo Dec 7, 2021
a1dbbc7
Fix typo in pydoc
dinigo Dec 7, 2021
cb0566d
Make `env` dict for environment variables for google dbt hook optional
dinigo Dec 7, 2021
d4e5184
Make `service_account ` for google dbt hook optional
dinigo Dec 7, 2021
98c90f0
Shorten up conditional assignment of project_id
dinigo Dec 7, 2021
33af351
Make `env` dict for environment variables for base dbt hook optional
dinigo Dec 7, 2021
139bfe3
Shorten up conditional assignment of env dict
dinigo Dec 7, 2021
c6b3699
Make `env ` for base dbt hook optional
dinigo Dec 7, 2021
face550
Add descriptions to each parametrized dbt test
dinigo Dec 7, 2021
6f7a507
Add README.md code examples
dinigo Dec 7, 2021
1c41b36
Split check values in two for readability
dinigo Dec 7, 2021
52884ae
Extract provider version check to a function and test it, also typos
dinigo Dec 7, 2021
df62625
Fix conditionals comparing with None since None is `falsy` in python
dinigo Dec 7, 2021
642f6dd
Fix python files not being detected in setup by `find_packages`:
dinigo Jan 13, 2022
614309b
Bump version
dinigo Jan 13, 2022
23e16a6
Make operator font color white, easier to read against a mid-dark bg
dinigo Jan 13, 2022
ce219a0
Allow false flags and reorder dbt render
dinigo Jan 14, 2022
22ddab1
Share DbtBaseOperator props between subclasses
dinigo Jan 14, 2022
de1fec4
Ignore deprecated @apply_defaults, It's needed for backwards compat
dinigo Jan 14, 2022
0f6e313
Fix dbt config, some params for dbt and some for the specific command
dinigo Jan 14, 2022
f2075f5
Allow a custom image other than the official from fishtown
dinigo Jan 14, 2022
b279112
Add DbtCloudBuildOperator templated fields to the DbtBaseOperator ones
dinigo Jan 14, 2022
957f76b
Bump dbt docker image for cloud build
dinigo Jan 14, 2022
a2a3346
Cloud build hook can raise silently. Surround with a try/catch
dinigo Jan 14, 2022
4515199
Add default conn_id, dbt_version and dbt_image and simpler cloudbuild…
dinigo Jan 14, 2022
f42cdfe
Fix compoundd commands like 'dbt docs generate' not running in GCP
dinigo Feb 1, 2022
0c1bcf4
Export artifacts (`target` folder) when running in cloud build:
dinigo Feb 1, 2022
697a2ba
Enhance type safety
dinigo Feb 1, 2022
460d361
Use `Dict` instead of the default new `dict` for backwards compat
dinigo Mar 8, 2022
7cab0fd
use the cloud build library instead of the hook
dinigo Mar 8, 2022
b3dbdb6
Ignore terraform, dbt and pycharm related files
dinigo Mar 8, 2022
e1969f8
Bump version
dinigo Mar 8, 2022
9766bbc
chore: update default dbt-imge to github registry
dinigo Oct 24, 2022
f1744a8
feat: add debug operator
dinigo Oct 24, 2022
f34d22e
chore: update default dbt docker image versions, part 2
dinigo Oct 24, 2022
09548af
chore: simpler and standard __version__ so that compilation picks it
dinigo Oct 24, 2022
98692ed
ci: replace old build system with the new default pyproject
dinigo Oct 24, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions airflow_dbt/dbt_command_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import sys

if sys.version_info[0] == 3 and sys.version_info[1] >= 8:
from typing import TypedDict
else:
from typing_extensions import TypedDict
dinigo marked this conversation as resolved.
Show resolved Hide resolved


class DbtCommandConfig(TypedDict, total=False):
"""
Holds the structure of a dictionary containing dbt config. Provides the
types and names for each one, and also helps shortening the constructor
since we can nest it and reuse it
"""
# global flags
version: bool
record_timing_info: bool
debug: bool
log_format: str # either 'text', 'json' or 'default'
write_json: bool
strict: bool
warn_error: bool
partial_parse: bool
use_experimental_parser: bool
use_colors: bool

# per command flags
profiles_dir: str
project_dir: str
target: str
vars: dict
models: str
exclude: str
verbose: bool

# run specific
full_refresh: bool
profile: str

# docs specific
no_compile: bool

# debug specific
config_dir: str

# ls specific
resource_type: str # models, snapshots, seeds, tests, and sources.
# '--resource-type'
select: str
models: str
exclude: str
selector: str
output: str
output_keys: str

# rpc specific
host: str
port: int

# run specific
fail_fast: bool

# run-operation specific
args: dict

# test specific
data: bool
schema: bool
4 changes: 3 additions & 1 deletion airflow_dbt/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .dbt_hook import DbtCliHook
from .base import DbtBaseHook
from .cli import DbtCliHook
from .google import DbtCloudBuildHook
225 changes: 225 additions & 0 deletions airflow_dbt/hooks/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
from __future__ import print_function

import json
from abc import ABC, abstractmethod
from typing import Dict, List, Union

from airflow.hooks.base_hook import BaseHook


class DbtBaseHook(BaseHook, ABC):
"""
Simple wrapper around the dbt CLI.

:type env: dict
:param env: If set, passed to the dbt executor
:param dbt_bin: The `dbt` CLI. Defaults to `dbt`, so assumes it's on your
`PATH`
:type dbt_bin: str
"""

def __init__(self, env: Dict = None):
dinigo marked this conversation as resolved.
Show resolved Hide resolved
super().__init__()
self.env = env if env is not None else {}
dinigo marked this conversation as resolved.
Show resolved Hide resolved

def generate_dbt_cli_command(
self,
dbt_bin: str = None,
command: str = None,
# global flags
version: bool = False,
record_timing_info: bool = False,
debug: bool = False,
log_format: str = None, # either 'text', 'json' or 'default'
write_json: bool = None,
strict: bool = False,
warn_error: bool = False,
partial_parse: bool = False,
use_experimental_parser: bool = False,
use_colors: bool = None,
# command specific config
profiles_dir: str = None,
project_dir: str = None,
profile: str = None,
target: str = None,
config_dir: str = None,
resource_type: str = None,
vars: Dict = None,
# run specific
full_refresh: bool = False,
# ls specific
data: bool = False,
schema: bool = False,
models: str = None,
exclude: str = None,
select: str = None,
selector: str = None,
output: str = None,
output_keys: str = None,
# rpc specific
host: str = None,
port: str = None,
# test specific
fail_fast: bool = False,
args: dict = None,
no_compile: bool = False,
) -> List[str]:
"""
Generate the command that will be run based on class properties,
presets and dbt commands

:param command: The dbt sub-command to run
:type command: str
:param profiles_dir: If set, passed as the `--profiles-dir` argument to
the `dbt` command
:type profiles_dir: str
:param project_dir: If set, passed as the `--project-dir` argument to
the `dbt` command. It is required but by default points to the
current folder: '.'
:type project_dir: str
:param target: If set, passed as the `--target` argument to the `dbt`
command
:type vars: Union[str, dict]
:param vars: If set, passed as the `--vars` argument to the `dbt`
command
:param full_refresh: If `True`, will fully-refresh incremental models.
:type full_refresh: bool
:param data:
:type data: bool
:param schema:
:type schema: bool
:param models: If set, passed as the `--models` argument to the `dbt`
command
:type models: str
:param warn_error: If `True`, treat warnings as errors.
:type warn_error: bool
:param exclude: If set, passed as the `--exclude` argument to the `dbt`
command
:type exclude: str
:param use_colors: If set it adds the flag `--use-colors` or
`--no-use-colors`, depending if True or False.
:param select: If set, passed as the `--select` argument to the `dbt`
command
:type select: str
"""

dbt_cmd: List[str] = []

# if there's no bin do not append it. Rather generate the command
# without the `/path/to/dbt` prefix. That is useful for running it
# inside containers that have already set the entrypoint.
if dbt_bin is not None and not dbt_bin == '':
dbt_cmd.append(dbt_bin)

# add global flags at the beginning
if version:
dbt_cmd.append('--version')

if record_timing_info:
dbt_cmd.append('--record-timing-info')

if debug:
dbt_cmd.append('--debug')

if log_format is not None:
dbt_cmd.extend(['--log-format', log_format])

if write_json is not None:
write_json_flag = '--write-json' if write_json else \
'--no-write-json'
dbt_cmd.append(write_json_flag)

if strict:
dbt_cmd.append('--strict')

if warn_error:
dbt_cmd.append('--warn-error')

if partial_parse:
dbt_cmd.append('--partial-parse')

if use_experimental_parser:
dbt_cmd.append('--use-experimental-parser')

if use_colors is not None:
colors_flag = "--use-colors" if use_colors else "--no-use-colors"
dbt_cmd.append(colors_flag)

# appends the main command
dbt_cmd.append(command)

# appends configuration relative to the command
if profiles_dir is not None:
dbt_cmd.extend(['--profiles-dir', profiles_dir])

if project_dir is not None:
dbt_cmd.extend(['--project-dir', project_dir])

if profile is not None:
dbt_cmd.extend(['--profile', profile])

if target is not None:
dbt_cmd.extend(['--target', target])

# debug specific
if config_dir is not None:
dbt_cmd.extend(['--config-dir', config_dir])

# ls specific
if resource_type is not None:
dbt_cmd.extend(['--resource-type', resource_type])

if select is not None:
dbt_cmd.extend(['--select', select])

if models is not None:
dbt_cmd.extend(['--models', models])

if exclude is not None:
dbt_cmd.extend(['--exclude', exclude])

if selector is not None:
dbt_cmd.extend(['--selector', selector])

if output is not None:
dbt_cmd.extend(['--output', output])

if output_keys is not None:
dbt_cmd.extend(['--output-keys', output_keys])

# rpc specific
if host is not None:
dbt_cmd.extend(['--host', host])

if port is not None:
dbt_cmd.extend(['--port', str(port)])

# run specific
if full_refresh:
dbt_cmd.append('--full-refresh')

if fail_fast:
dbt_cmd.append('--fail-fast')

if vars is not None:
dbt_cmd.extend(['--vars', json.dumps(vars)])

# run-operation specific
if args is not None:
dbt_cmd.extend(['--args', json.dumps(args)])

# test specific
if data:
dbt_cmd.append('--data')

if schema:
dbt_cmd.append('--schema')

if no_compile:
dbt_cmd.append('--no-compile')

return dbt_cmd

@abstractmethod
def run_dbt(self, dbt_cmd: Union[str, List[str]]):
"""Run the dbt command"""
dinigo marked this conversation as resolved.
Show resolved Hide resolved
56 changes: 56 additions & 0 deletions airflow_dbt/hooks/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from __future__ import print_function

from typing import Any, Dict, List, Union

from airflow import AirflowException
from airflow.hooks.subprocess import SubprocessHook

from .base import DbtBaseHook


class DbtCliHook(DbtBaseHook):
"""
Run the dbt command in the same airflow worker the task is being run.
This requires the `dbt` python package to be installed in it first. Also
the dbt_bin path might not be set in the `PATH` variable, so it could be
necessary to set it in the constructor.

:type dir: str
:param dir: The directory to run the CLI in
:type env: dict
:param env: If set, passed to the dbt executor
:param dbt_bin: The `dbt` CLI. Defaults to `dbt`, so assumes it's on your
`PATH`
:type dbt_bin: str
"""

def __init__(self, env: Dict = None):
dinigo marked this conversation as resolved.
Show resolved Hide resolved
self.sp = SubprocessHook()
super().__init__(env=env)

def get_conn(self) -> Any:
"""
Return the subprocess connection, which isn't implemented, just for
conformity
"""
return self.sp.get_conn()

def run_dbt(self, dbt_cmd: Union[str, List[str]]):
"""
Run the dbt cli

:param dbt_cmd: The dbt whole command to run
:type dbt_cmd: List[str]
"""
result = self.sp.run_command(
command=dbt_cmd,
env=self.env,
)

if result.exit_code != 0:
raise AirflowException(f'Error executing the DBT command: '
f'{result.output}')

def on_kill(self):
"""Kill the open subprocess if the task gets killed by Airflow"""
self.sp.send_sigterm()
Loading