Skip to content

Commit

Permalink
Merge pull request gocardless#25 from gocardless/dbt-seed-operator
Browse files Browse the repository at this point in the history
Add DbtSeedOperator, test and update readme
  • Loading branch information
d-swift authored Sep 18, 2020
2 parents e3aa3f7 + 9684141 commit 65fe472
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 8 deletions.
22 changes: 17 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ This is a collection of [Airflow](https://airflow.apache.org/) operators to prov

```py
from airflow import DAG
from airflow_dbt.operators.dbt_operator import DbtSnapshotOperator, DbtRunOperator, DbtTestOperator
from airflow_dbt.operators.dbt_operator import (
DbtSeedOperator,
DbtSnapshotOperator,
DbtRunOperator,
DbtTestOperator
)
from airflow.utils.dates import days_ago

default_args = {
Expand All @@ -14,6 +19,10 @@ default_args = {

with DAG(dag_id='dbt', default_args=default_args, schedule_interval='@daily') as dag:

dbt_seed = DbtSeedOperator(
task_id='dbt_seed',
)

dbt_snapshot = DbtSnapshotOperator(
task_id='dbt_snapshot',
)
Expand All @@ -27,7 +36,7 @@ with DAG(dag_id='dbt', default_args=default_args, schedule_interval='@daily') as
retries=0, # Failing tests would fail the task, and we don't want Airflow to try again
)

dbt_snapshot >> dbt_run >> dbt_test
dbt_seed >> dbt_snapshot >> dbt_run >> dbt_test
```

## Installation
Expand All @@ -42,15 +51,18 @@ It will also need access to the `dbt` CLI, which should either be on your `PATH`

## Usage

There are three operators currently implemented:
There are four operators currently implemented:

* `DbtRunOperator`
* Calls [`dbt run`](https://docs.getdbt.com/docs/run)
* `DbtSeedOperator`
* Calls [`dbt seed`](https://docs.getdbt.com/docs/seed)
* `DbtSnapshotOperator`
* Calls [`dbt snapshot`](https://docs.getdbt.com/docs/snapshot)
* `DbtRunOperator`
* Calls [`dbt run`](https://docs.getdbt.com/docs/run)
* `DbtTestOperator`
* Calls [`dbt test`](https://docs.getdbt.com/docs/test)


Each of the above operators accept the following arguments:

* `profiles_dir`
Expand Down
7 changes: 6 additions & 1 deletion airflow_dbt/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
from .hooks import DbtCliHook
from .operators import DbtRunOperator, DbtTestOperator
from .operators import (
DbtSeedOperator,
DbtSnapshotOperator,
DbtRunOperator,
DbtTestOperator
)
7 changes: 6 additions & 1 deletion airflow_dbt/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
from .dbt_operator import DbtRunOperator, DbtTestOperator
from .dbt_operator import (
DbtSeedOperator,
DbtSnapshotOperator,
DbtRunOperator,
DbtTestOperator
)
9 changes: 9 additions & 0 deletions airflow_dbt/operators/dbt_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,12 @@ def __init__(self, profiles_dir=None, target=None, *args, **kwargs):

def execute(self, context):
self.create_hook().run_cli('snapshot')


class DbtSeedOperator(DbtBaseOperator):
@apply_defaults
def __init__(self, profiles_dir=None, target=None, *args, **kwargs):
super(DbtSeedOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs)

def execute(self, context):
self.create_hook().run_cli('seed')
16 changes: 15 additions & 1 deletion tests/operators/test_dbt_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
from unittest import TestCase, mock
from airflow import DAG, configuration
from airflow_dbt.hooks.dbt_hook import DbtCliHook
from airflow_dbt.operators.dbt_operator import DbtRunOperator, DbtTestOperator, DbtSnapshotOperator
from airflow_dbt.operators.dbt_operator import (
DbtSeedOperator,
DbtSnapshotOperator,
DbtRunOperator,
DbtTestOperator
)


class TestDbtOperator(TestCase):
Expand Down Expand Up @@ -40,3 +45,12 @@ def test_dbt_snapshot(self, mock_run_cli):
)
operator.execute(None)
mock_run_cli.assert_called_once_with('snapshot')

@mock.patch.object(DbtCliHook, 'run_cli')
def test_dbt_seed(self, mock_run_cli):
operator = DbtSeedOperator(
task_id='seed',
dag=self.dag
)
operator.execute(None)
mock_run_cli.assert_called_once_with('seed')

0 comments on commit 65fe472

Please sign in to comment.