Skip to content

Commit

Permalink
Merge upstream changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ayobamshy committed Sep 23, 2020
2 parents 842da10 + 054891e commit 0a25f8a
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 9 deletions.
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,22 @@ Removed the operator `DbtDepsOperator`. Dependencies defined in dbt's `packages.

* `DbtDepsOperator`
* Calls [`dbt deps`](https://docs.getdbt.com/docs/deps)


# v0.1.1

Makes `vars` a jinja templated field. See [here](https://airflow.apache.org/docs/stable/concepts.html#jinja-templating) for more information.

# v0.1.2

Fix verbose logging of command to include `--full_refresh`.

# v0.2.0

Add the `DbtSnapshotOperator`.

# v0.3.0

Add the `DbtSeedOperator`.

Support `--select` for the `DbtSnapshotOperator`.
31 changes: 28 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,30 @@
This is a collection of [Airflow](https://airflow.apache.org/) operators to provide easy integration with [dbt](https://www.getdbt.com).

```py
from airflow import DAG
from airflow_dbt.operators.dbt_operator import (
DbtSeedOperator,
DbtSnapshotOperator,
DbtRunOperator,
DbtTestOperator
)
from airflow.utils.dates import days_ago

default_args = {
dbt_dir = '/srv/app/dbt'
'dir': '/srv/app/dbt',
'start_date': days_ago(0)
}

with DAG(dag_id='dbt', default_args=default_args, schedule_interval='@daily') as dag:

dbt_seed = DbtSeedOperator(
task_id='dbt_seed',
)

dbt_snapshot = DbtSnapshotOperator(
task_id='dbt_snapshot',
)

dbt_run = DbtRunOperator(
task_id='dbt_run',
)
Expand All @@ -18,7 +36,7 @@ with DAG(dag_id='dbt', default_args=default_args, schedule_interval='@daily') as
retries=0, # Failing tests would fail the task, and we don't want Airflow to try again
)

dbt_run >> dbt_test
dbt_seed >> dbt_snapshot >> dbt_run >> dbt_test
```

## Installation
Expand All @@ -33,13 +51,18 @@ It will also need access to the `dbt` CLI, which should either be on your `PATH`

## Usage

There are two operators currently implemented:
There are four operators currently implemented:

* `DbtSeedOperator`
* Calls [`dbt seed`](https://docs.getdbt.com/docs/seed)
* `DbtSnapshotOperator`
* Calls [`dbt snapshot`](https://docs.getdbt.com/docs/snapshot)
* `DbtRunOperator`
* Calls [`dbt run`](https://docs.getdbt.com/docs/run)
* `DbtTestOperator`
* Calls [`dbt test`](https://docs.getdbt.com/docs/test)


Each of the above operators accept the following arguments:

* `profiles_dir`
Expand All @@ -56,6 +79,8 @@ Each of the above operators accept the following arguments:
* If set, passed as the `--models` argument to the `dbt` command
* `exclude`
* If set, passed as the `--exclude` argument to the `dbt` command
* `select`
* If set, passed as the `--select` argument to the `dbt` command
* `dbt_bin`
* The `dbt` CLI. Defaults to `dbt`, so assumes it's on your `PATH`
* `verbose`
Expand Down
8 changes: 7 additions & 1 deletion airflow_dbt/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
from .hooks import DbtCliHook
from .operators import DbtRunOperator, DbtTestOperator, DbtDocsGenerateOperator
from .operators import (
DbtSeedOperator,
DbtSnapshotOperator,
DbtRunOperator,
DbtTestOperator,
DbtDocsGenerateOperator
)
2 changes: 1 addition & 1 deletion airflow_dbt/__version__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
VERSION = (0, 1, 0)
VERSION = (0, 3, 0)

__version__ = '.'.join(map(str, VERSION))
11 changes: 9 additions & 2 deletions airflow_dbt/hooks/dbt_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class DbtCliHook(BaseHook):
:type models: str
:param exclude: If set, passed as the `--exclude` argument to the `dbt` command
:type exclude: str
:param select: If set, passed as the `--select` argument to the `dbt` command
:type select: str
:param dbt_bin: The `dbt` CLI. Defaults to `dbt`, so assumes it's on your `PATH`
:type dbt_bin: str
:param output_encoding: Output encoding of bash command. Defaults to utf-8
Expand All @@ -43,6 +45,7 @@ def __init__(self,
schema_test=False,
models=None,
exclude=None,
select=None,
dbt_bin='dbt',
output_encoding='utf-8',
verbose=True):
Expand All @@ -55,6 +58,7 @@ def __init__(self,
self.schema_test = schema_test
self.models = models
self.exclude = exclude
self.select = select
self.dbt_bin = dbt_bin
self.verbose = verbose
self.output_encoding = output_encoding
Expand Down Expand Up @@ -96,12 +100,15 @@ def run_cli(self, *command):
if self.exclude is not None:
dbt_cmd.extend(['--exclude', self.exclude])

if self.verbose:
self.log.info(" ".join(dbt_cmd))
if self.select is not None:
dbt_cmd.extend(['--select', self.select])

if self.full_refresh:
dbt_cmd.extend(['--full-refresh'])

if self.verbose:
self.log.info(" ".join(dbt_cmd))

sp = subprocess.Popen(
dbt_cmd,
stdout=subprocess.PIPE,
Expand Down
9 changes: 9 additions & 0 deletions airflow_dbt/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,10 @@
<<<<<<< HEAD
from .dbt_operator import DbtRunOperator, DbtTestOperator, DbtDocsGenerateOperator
=======
from .dbt_operator import (
DbtSeedOperator,
DbtSnapshotOperator,
DbtRunOperator,
DbtTestOperator
)
>>>>>>> upstream/master
23 changes: 22 additions & 1 deletion airflow_dbt/operators/dbt_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class DbtBaseOperator(BaseOperator):
:type models: str
:param exclude: If set, passed as the `--exclude` argument to the `dbt` command
:type exclude: str
:param select: If set, passed as the `--select` argument to the `dbt` command
:type select: str
:param dbt_bin: The `dbt` CLI. Defaults to `dbt`, so assumes it's on your `PATH`
:type dbt_bin: str
:param verbose: The operator will log verbosely to the Airflow logs
Expand All @@ -40,6 +42,7 @@ def __init__(self,
vars=None,
models=None,
exclude=None,
select=None,
dbt_bin='dbt',
verbose=True,
full_refresh=False,
Expand All @@ -58,6 +61,7 @@ def __init__(self,
self.data_test = data_test
self.schema_test = schema_test
self.exclude = exclude
self.select = select
self.dbt_bin = dbt_bin
self.verbose = verbose
self.create_hook()
Expand All @@ -73,6 +77,7 @@ def create_hook(self):
schema_test=self.schema_test,
models=self.models,
exclude=self.exclude,
select=self.select,
dbt_bin=self.dbt_bin,
verbose=self.verbose)

Expand Down Expand Up @@ -103,4 +108,20 @@ def __init__(self, profiles_dir=None, target=None, *args, **kwargs):
super(DbtDocsGenerateOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs)

def execute(self, context):
self.create_hook().run_cli('docs', 'generate')
self.create_hook().run_cli('docs', 'generate')
class DbtSnapshotOperator(DbtBaseOperator):
@apply_defaults
def __init__(self, profiles_dir=None, target=None, *args, **kwargs):
super(DbtSnapshotOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs)

def execute(self, context):
self.create_hook().run_cli('snapshot')


class DbtSeedOperator(DbtBaseOperator):
@apply_defaults
def __init__(self, profiles_dir=None, target=None, *args, **kwargs):
super(DbtSeedOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs)

def execute(self, context):
self.create_hook().run_cli('seed')
25 changes: 24 additions & 1 deletion tests/operators/test_dbt_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
from unittest import TestCase, mock
from airflow import DAG, configuration
from airflow_dbt.hooks.dbt_hook import DbtCliHook
from airflow_dbt.operators.dbt_operator import DbtRunOperator, DbtTestOperator
from airflow_dbt.operators.dbt_operator import (
DbtSeedOperator,
DbtSnapshotOperator,
DbtRunOperator,
DbtTestOperator
)


class TestDbtOperator(TestCase):
Expand Down Expand Up @@ -31,3 +36,21 @@ def test_dbt_test(self, mock_run_cli):
)
operator.execute(None)
mock_run_cli.assert_called_once_with('test')

@mock.patch.object(DbtCliHook, 'run_cli')
def test_dbt_snapshot(self, mock_run_cli):
operator = DbtSnapshotOperator(
task_id='snapshot',
dag=self.dag
)
operator.execute(None)
mock_run_cli.assert_called_once_with('snapshot')

@mock.patch.object(DbtCliHook, 'run_cli')
def test_dbt_seed(self, mock_run_cli):
operator = DbtSeedOperator(
task_id='seed',
dag=self.dag
)
operator.execute(None)
mock_run_cli.assert_called_once_with('seed')

0 comments on commit 0a25f8a

Please sign in to comment.