Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
Merge pull request #60 from RafalSiwek/master
Browse files Browse the repository at this point in the history
Add the functionality to parse env parameter to the airflow worker
  • Loading branch information
andrewrjones authored May 27, 2022
2 parents aaf3fcd + 6be0e27 commit 17339d3
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 2 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ Support `--select` for the `DbtSnapshotOperator`.

# v0.4.0

Add the `DbtDocsGenerateOperator` and `DbtDepsOperator`.
Add the `DbtDocsGenerateOperator` and `DbtDepsOperator`.
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ There are five operators currently implemented:

Each of the above operators accept the following arguments:

* `env`
* If set as a kwarg dict, passed the given environment variables as the arguments to the dbt task
* `profiles_dir`
* If set, passed as the `--profiles-dir` argument to the `dbt` command
* `target`
Expand Down Expand Up @@ -164,6 +166,41 @@ dbt_run = DbtRunOperator(
)
```

## Templating and parsing environments variables

If you would like to run DBT using custom profile definition template with environment-specific variables, like for example profiles.yml using jinja:
```yaml
<profile_name>:
outputs:
<source>:
database: "{{ env_var('DBT_ENV_SECRET_DATABASE') }}"
password: "{{ env_var('DBT_ENV_SECRET_PASSWORD') }}"
schema: "{{ env_var('DBT_ENV_SECRET_SCHEMA') }}"
threads: "{{ env_var('DBT_THREADS') }}"
type: <type>
user: "{{ env_var('USER_NAME') }}_{{ env_var('ENV_NAME') }}"
target: <source>
```
You can pass the environment variables via the `env` kwarg parameter:

```python
import os
...
dbt_run = DbtRunOperator(
task_id='dbt_run',
env={
'DBT_ENV_SECRET_DATABASE': '<DATABASE>',
'DBT_ENV_SECRET_PASSWORD': '<PASSWORD>',
'DBT_ENV_SECRET_SCHEMA': '<SCHEMA>',
'USER_NAME': '<USER_NAME>',
'DBT_THREADS': os.getenv('<DBT_THREADS_ENV_VARIABLE_NAME>'),
'ENV_NAME': os.getenv('ENV_NAME')
}
)
```

## License & Contributing

* This is available as open source under the terms of the [MIT License](http://opensource.org/licenses/MIT).
Expand Down
5 changes: 5 additions & 0 deletions airflow_dbt/hooks/dbt_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ class DbtCliHook(BaseHook):
"""
Simple wrapper around the dbt CLI.
:param env: If set, passes the env variables to the subprocess handler
:type env: dict
:param profiles_dir: If set, passed as the `--profiles-dir` argument to the `dbt` command
:type profiles_dir: str
:param target: If set, passed as the `--target` argument to the `dbt` command
Expand Down Expand Up @@ -40,6 +42,7 @@ class DbtCliHook(BaseHook):
"""

def __init__(self,
env=None,
profiles_dir=None,
target=None,
dir='.',
Expand All @@ -55,6 +58,7 @@ def __init__(self,
output_encoding='utf-8',
verbose=True,
warn_error=False):
self.env = env or {}
self.profiles_dir = profiles_dir
self.dir = dir
self.target = target
Expand Down Expand Up @@ -125,6 +129,7 @@ def run_cli(self, *command):

sp = subprocess.Popen(
dbt_cmd,
env=self.env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=self.dir,
Expand Down
7 changes: 6 additions & 1 deletion airflow_dbt/operators/dbt_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ class DbtBaseOperator(BaseOperator):
Base dbt operator
All other dbt operators are derived from this operator.
:param env: If set, passes the env variables to the subprocess handler
:type env: dict
:param profiles_dir: If set, passed as the `--profiles-dir` argument to the `dbt` command
:type profiles_dir: str
:param target: If set, passed as the `--target` argument to the `dbt` command
Expand Down Expand Up @@ -36,10 +38,11 @@ class DbtBaseOperator(BaseOperator):

ui_color = '#d6522a'

template_fields = ['vars']
template_fields = ['env', 'vars']

@apply_defaults
def __init__(self,
env=None,
profiles_dir=None,
target=None,
dir='.',
Expand All @@ -58,6 +61,7 @@ def __init__(self,
**kwargs):
super(DbtBaseOperator, self).__init__(*args, **kwargs)

self.env = env or {}
self.profiles_dir = profiles_dir
self.target = target
self.dir = dir
Expand All @@ -76,6 +80,7 @@ def __init__(self,

def create_hook(self):
self.hook = DbtCliHook(
env=self.env,
profiles_dir=self.profiles_dir,
target=self.target,
dir=self.dir,
Expand Down
Empty file modified tests/__init__.py
100644 → 100755
Empty file.
27 changes: 27 additions & 0 deletions tests/hooks/test_dbt_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def test_sub_commands(self, mock_subproc_popen):
'docs',
'generate'
],
env={},
close_fds=True,
cwd='.',
stdout=subprocess.PIPE,
Expand All @@ -47,8 +48,34 @@ def test_vars(self, mock_subproc_popen):
'--vars',
'{"foo": "bar", "baz": "true"}'
],
env={},
close_fds=True,
cwd='.',
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)

@mock.patch('subprocess.Popen')
def test_envs(self, mock_subproc_popen):
mock_subproc_popen.return_value \
.communicate.return_value = ('output', 'error')
mock_subproc_popen.return_value.returncode = 0
mock_subproc_popen.return_value \
.stdout.readline.side_effect = [b"placeholder"]

hook = DbtCliHook(vars={"foo": "bar", "baz": "true"}, env={"foo": "bar", "baz": "true"})
hook.run_cli('run')

mock_subproc_popen.assert_called_once_with(
[
'dbt',
'run',
'--vars',
'{"foo": "bar", "baz": "true"}'
],
env={"foo": "bar", "baz": "true"},
close_fds=True,
cwd='.',
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)

0 comments on commit 17339d3

Please sign in to comment.